Skip to content

Commit 2afef2b

Browse files
Remove physical module layout info from read sets (#5149)
# Description of Changes I believe this to fix #4947. Previously, committed read sets stored module-local indexes used for view dispatch. These indexes are not stable across module updates, but they were not removed from the committed state, so that later when a write triggered a view refresh, the runtime could dispatch the wrong view function or attempt to materialize into the wrong backing table which would result in a fatal error. This patch removes these unstable indexes from read sets. The core behavior change is that committed read sets no longer depend on stale per-module layout details. They identify the logical view call, and refresh resolves the current module metadata when needed. # API and ABI breaking changes N/A # Expected complexity level and risk 2 # Testing - [x] Auto-migrate smoketests
1 parent d31301a commit 2afef2b

18 files changed

Lines changed: 558 additions & 94 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1616
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1717
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
1818
};
19-
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId};
19+
use spacetimedb_datastore::locking_tx_datastore::{
20+
ApplyHistoryCounters, IndexScanPointOrRange, MutTxId, TxId, ViewCallInfo,
21+
};
2022
use spacetimedb_datastore::system_tables::{
2123
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
2224
};
@@ -1601,6 +1603,26 @@ impl RelationalDB {
16011603
Ok(())
16021604
}
16031605

1606+
/// Materialize a view call and replace its committed read set on transaction commit.
1607+
///
1608+
/// The read set is only marked for replacement after materialization succeeds. If the
1609+
/// transaction rolls back, the replacement marker rolls back with it.
1610+
pub fn materialize_view_call(
1611+
&self,
1612+
tx: &mut MutTxId,
1613+
table_id: TableId,
1614+
view_call: ViewCallInfo,
1615+
rows: Vec<ProductValue>,
1616+
) -> Result<(), DBError> {
1617+
match view_call.sender {
1618+
Some(sender) => self.materialize_view(tx, table_id, sender, rows)?,
1619+
None => self.materialize_anonymous_view(tx, table_id, rows)?,
1620+
}
1621+
tx.replace_view_read_set(view_call);
1622+
1623+
Ok(())
1624+
}
1625+
16041626
fn write_view_rows(
16051627
&self,
16061628
tx: &mut MutTxId,

crates/core/src/host/module_host.rs

Lines changed: 75 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,66 @@ pub struct CallViewParams {
10891089
pub timestamp: Timestamp,
10901090
}
10911091

1092+
pub(crate) struct ResolvedViewForRefresh<'a> {
1093+
pub view_id: ViewId,
1094+
pub table_id: TableId,
1095+
pub view_def: &'a ViewDef,
1096+
}
1097+
1098+
/// Lookup a module's [`ViewDef`] and check for consistency among
1099+
/// its readset, `st_view`, and the [`ModuleDef`].
1100+
pub(crate) fn resolve_view_for_refresh<'a>(
1101+
tx: &MutTxId,
1102+
module_def: &'a ModuleDef,
1103+
view_call: &ViewCallInfo,
1104+
) -> anyhow::Result<ResolvedViewForRefresh<'a>> {
1105+
let st_view = tx
1106+
.lookup_st_view(view_call.view_id)
1107+
.with_context(|| format!("failed to look up view {}", view_call.view_id))?;
1108+
1109+
let view_id = st_view.view_id;
1110+
let table_id = st_view
1111+
.table_id
1112+
.ok_or_else(|| anyhow::anyhow!("view {:?} does not have a backing table", view_id))?;
1113+
1114+
let view_name: Identifier = st_view.view_name.into();
1115+
let view_def = module_def.view(&view_name).ok_or_else(|| {
1116+
anyhow::anyhow!(
1117+
"view `{}` for view id `{}` not found in current module",
1118+
view_name,
1119+
view_id
1120+
)
1121+
})?;
1122+
1123+
let is_anonymous = view_call.sender.is_none();
1124+
1125+
if st_view.is_anonymous != is_anonymous {
1126+
return Err(anyhow::anyhow!(
1127+
"found is_anonymous={} in st_view, but {} in readset when updating view `{}`",
1128+
st_view.is_anonymous,
1129+
is_anonymous,
1130+
view_name,
1131+
));
1132+
}
1133+
1134+
let is_anonymous = view_def.is_anonymous;
1135+
1136+
if st_view.is_anonymous != is_anonymous {
1137+
return Err(anyhow::anyhow!(
1138+
"found is_anonymous={} in st_view, but {} in module when updating view `{}`",
1139+
st_view.is_anonymous,
1140+
is_anonymous,
1141+
view_name,
1142+
));
1143+
}
1144+
1145+
Ok(ResolvedViewForRefresh {
1146+
view_id,
1147+
table_id,
1148+
view_def,
1149+
})
1150+
}
1151+
10921152
pub struct CallProcedureParams {
10931153
pub timestamp: Timestamp,
10941154
pub caller_identity: Identity,
@@ -2897,17 +2957,21 @@ impl ModuleHost {
28972957
let mut total_duration = Duration::ZERO;
28982958
let mut abi_duration = Duration::ZERO;
28992959
let mut trapped = false;
2900-
for ViewCallInfo {
2901-
view_id,
2902-
table_id,
2903-
fn_ptr,
2904-
sender,
2905-
} in tx.views_for_refresh().cloned().collect::<Vec<_>>()
2906-
{
2907-
let Some(view_def) = module_def.get_view_by_id(fn_ptr, sender.is_none()) else {
2908-
outcome = ViewOutcome::Failed(format!("view with fn_ptr `{fn_ptr}` not found"));
2909-
break;
2960+
for view_call in tx.views_for_refresh().cloned().collect::<Vec<_>>() {
2961+
let sender = view_call.sender;
2962+
let resolved = match resolve_view_for_refresh(&tx, module_def, &view_call) {
2963+
Ok(resolved) => resolved,
2964+
Err(err) => {
2965+
outcome = ViewOutcome::Failed(format!("failed to resolve view: {err}"));
2966+
break;
2967+
}
29102968
};
2969+
let ResolvedViewForRefresh {
2970+
view_id,
2971+
table_id,
2972+
view_def,
2973+
} = resolved;
2974+
let view_name = &view_def.name;
29112975
let args = match FunctionArgs::Nullary.into_tuple_for_def(module_def, view_def) {
29122976
Ok(args) => args,
29132977
Err(err) => {
@@ -2919,7 +2983,7 @@ impl ModuleHost {
29192983
let (result, trap) = Self::call_view_inner(
29202984
instance,
29212985
tx,
2922-
&view_def.name,
2986+
view_name,
29232987
view_id,
29242988
table_id,
29252989
view_def.fn_ptr,

crates/core/src/host/v8/syscall/common.rs

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use anyhow::Context;
2525
use bytes::Bytes;
2626
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
2727
use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp};
28-
use spacetimedb_primitives::{ColId, IndexId, ProcedureId, TableId};
28+
use spacetimedb_primitives::{ColId, IndexId, ProcedureId, TableId, ViewFnPtr};
2929
use spacetimedb_sats::bsatn;
3030
use spacetimedb_schema::def::ModuleDef;
3131
use spacetimedb_schema::identifier::Identifier;
@@ -752,19 +752,22 @@ fn refresh_views(
752752

753753
for view_call in views_for_refresh {
754754
let res: SysCallResult<()> = (|| {
755-
let view_def = module_def
756-
.get_view_by_id(view_call.fn_ptr, view_call.sender.is_none())
757-
.ok_or_else(|| {
758-
TypeError(format!(
759-
"view with fn_ptr `{}` not found while refreshing procedure transaction",
760-
view_call.fn_ptr
761-
))
762-
.throw(scope)
763-
})?;
755+
let resolved = crate::host::module_host::resolve_view_for_refresh(
756+
tx.as_ref().expect("procedure tx missing during view refresh"),
757+
module_def,
758+
&view_call,
759+
)
760+
.map_err(|err| TypeError(format!("view refresh failed after procedure call: {err}")).throw(scope))?;
761+
762+
let table_id = resolved.table_id;
763+
let view_def = resolved.view_def;
764+
let view_name = &view_def.name;
765+
let fn_ptr = view_def.fn_ptr;
764766

765767
let current_tx = tx.take().expect("procedure tx missing during view refresh");
766-
let (next_tx, call_result) =
767-
tx_slot.set(current_tx, || call_view(scope, hooks, &view_call, &view_def.name));
768+
let (next_tx, call_result) = tx_slot.set(current_tx, || {
769+
call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr)
770+
});
768771
tx = Some(next_tx);
769772
let return_data = call_result?;
770773

@@ -814,25 +817,14 @@ fn refresh_views(
814817
})?,
815818
};
816819

817-
match view_call.sender {
818-
Some(sender) => stdb
819-
.materialize_view(
820-
tx.as_mut()
821-
.expect("procedure tx missing while materializing authenticated view"),
822-
view_call.table_id,
823-
sender,
824-
rows,
825-
)
826-
.map_err(NodesError::from)?,
827-
None => stdb
828-
.materialize_anonymous_view(
829-
tx.as_mut()
830-
.expect("procedure tx missing while materializing anonymous view"),
831-
view_call.table_id,
832-
rows,
833-
)
834-
.map_err(NodesError::from)?,
835-
}
820+
stdb.materialize_view_call(
821+
tx.as_mut()
822+
.expect("procedure tx missing while materializing refreshed view"),
823+
table_id,
824+
view_call.clone(),
825+
rows,
826+
)
827+
.map_err(NodesError::from)?;
836828

837829
Ok(())
838830
})();
@@ -857,6 +849,8 @@ fn call_view(
857849
hooks: &HookFunctions<'_>,
858850
view_call: &ViewCallInfo,
859851
view_name: &Identifier,
852+
table_id: TableId,
853+
fn_ptr: ViewFnPtr,
860854
) -> SysCallResult<ViewReturnData> {
861855
let prev_func_type = get_env(scope)?
862856
.instance_env
@@ -871,8 +865,8 @@ fn call_view(
871865
ViewOp {
872866
name: view_name,
873867
view_id: view_call.view_id,
874-
table_id: view_call.table_id,
875-
fn_ptr: view_call.fn_ptr,
868+
table_id,
869+
fn_ptr,
876870
args: &args,
877871
sender: &sender,
878872
timestamp: Timestamp::now(),
@@ -884,8 +878,8 @@ fn call_view(
884878
AnonymousViewOp {
885879
name: view_name,
886880
view_id: view_call.view_id,
887-
table_id: view_call.table_id,
888-
fn_ptr: view_call.fn_ptr,
881+
table_id,
882+
fn_ptr,
889883
args: &args,
890884
timestamp: Timestamp::now(),
891885
},

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,6 +1359,7 @@ impl InstanceCommon {
13591359
(Ok(raw), sender) => {
13601360
// This is wrapped in a closure to simplify error handling.
13611361
let outcome: Result<ViewOutcome, anyhow::Error> = (|| {
1362+
let view_call = ViewCallInfo { view_id, sender };
13621363
let result = ViewResult::from_return_data(raw).context("Error parsing view result")?;
13631364
let typespace = self.info.module_def.typespace();
13641365
let row_product_type = typespace
@@ -1371,28 +1372,14 @@ impl InstanceCommon {
13711372
ViewResult::Rows(bytes) => deserialize_view_rows(row_type, bytes, typespace)
13721373
.context("Error deserializing rows returned by view".to_string())?,
13731374
ViewResult::RawSql(query) => self
1374-
.run_query_for_view(
1375-
&mut tx,
1376-
&query,
1377-
&row_product_type,
1378-
&ViewCallInfo {
1379-
view_id,
1380-
table_id,
1381-
fn_ptr,
1382-
sender,
1383-
},
1384-
)
1375+
.run_query_for_view(&mut tx, &query, &row_product_type, &view_call)
13851376
.context("Error executing raw SQL returned by view".to_string())?,
13861377
};
13871378

13881379
let replica_ctx = inst.replica_ctx();
13891380
let stdb = replica_ctx.relational_db();
1390-
let res = match sender {
1391-
Some(sender) => stdb.materialize_view(&mut tx, table_id, sender, rows),
1392-
None => stdb.materialize_anonymous_view(&mut tx, table_id, rows),
1393-
};
1394-
1395-
res.context("Error materializing view")?;
1381+
stdb.materialize_view_call(&mut tx, table_id, view_call, rows)
1382+
.context("Error materializing view")?;
13961383

13971384
Ok(ViewOutcome::Success)
13981385
})();
@@ -1798,8 +1785,6 @@ impl InstanceOp for ViewOp<'_> {
17981785
fn call_type(&self) -> FuncCallType {
17991786
FuncCallType::View(ViewCallInfo {
18001787
view_id: self.view_id,
1801-
table_id: self.table_id,
1802-
fn_ptr: self.fn_ptr,
18031788
sender: Some(*self.sender),
18041789
})
18051790
}
@@ -1828,8 +1813,6 @@ impl InstanceOp for AnonymousViewOp<'_> {
18281813
fn call_type(&self) -> FuncCallType {
18291814
FuncCallType::View(ViewCallInfo {
18301815
view_id: self.view_id,
1831-
table_id: self.table_id,
1832-
fn_ptr: self.fn_ptr,
18331816
sender: None,
18341817
})
18351818
}

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use spacetimedb_data_structures::map::IntMap;
1919
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
2020
use spacetimedb_lib::{bsatn, ConnectionId, Timestamp};
2121
use spacetimedb_primitives::errno::HOST_CALL_FAILURE;
22-
use spacetimedb_primitives::{errno, ColId};
22+
use spacetimedb_primitives::{errno, ColId, ViewFnPtr};
2323
use spacetimedb_schema::def::ModuleDef;
2424
use spacetimedb_schema::identifier::Identifier;
2525
use std::future::Future;
@@ -1697,13 +1697,20 @@ impl WasmInstanceEnv {
16971697

16981698
for view_call in views_for_refresh {
16991699
let res: anyhow::Result<()> = (|| {
1700-
let view_def = module_def
1701-
.get_view_by_id(view_call.fn_ptr, view_call.sender.is_none())
1702-
.ok_or_else(|| anyhow!("view with fn_ptr `{}` not found", view_call.fn_ptr))?;
1700+
let resolved = crate::host::module_host::resolve_view_for_refresh(
1701+
tx.as_ref().expect("procedure tx missing during view refresh"),
1702+
&module_def,
1703+
&view_call,
1704+
)?;
1705+
1706+
let table_id = resolved.table_id;
1707+
let view_def = resolved.view_def;
1708+
let view_name = &view_def.name;
1709+
let fn_ptr = view_def.fn_ptr;
17031710

17041711
let current_tx = tx.take().expect("procedure tx missing during view refresh");
17051712
let (next_tx, call_result) =
1706-
tx_slot.set(current_tx, || Self::call_view(caller, &view_call, &view_def.name));
1713+
tx_slot.set(current_tx, || Self::call_view(caller, &view_call, view_name, fn_ptr));
17071714
tx = Some(next_tx);
17081715
let return_data = call_result?;
17091716

@@ -1727,21 +1734,13 @@ impl WasmInstanceEnv {
17271734
};
17281735

17291736
let stdb = caller.data().instance_env.relational_db().clone();
1730-
match view_call.sender {
1731-
Some(sender) => stdb.materialize_view(
1732-
tx.as_mut()
1733-
.expect("procedure tx missing while materializing authenticated view"),
1734-
view_call.table_id,
1735-
sender,
1736-
rows,
1737-
)?,
1738-
None => stdb.materialize_anonymous_view(
1739-
tx.as_mut()
1740-
.expect("procedure tx missing while materializing anonymous view"),
1741-
view_call.table_id,
1742-
rows,
1743-
)?,
1744-
}
1737+
stdb.materialize_view_call(
1738+
tx.as_mut()
1739+
.expect("procedure tx missing while materializing refreshed view"),
1740+
table_id,
1741+
view_call.clone(),
1742+
rows,
1743+
)?;
17451744

17461745
Ok(())
17471746
})();
@@ -1766,6 +1765,7 @@ impl WasmInstanceEnv {
17661765
caller: &mut Caller<'a, Self>,
17671766
view_call: &ViewCallInfo,
17681767
view_name: &Identifier,
1768+
fn_ptr: ViewFnPtr,
17691769
) -> anyhow::Result<ViewReturnData> {
17701770
let prev_func_type = caller
17711771
.data_mut()
@@ -1792,7 +1792,7 @@ impl WasmInstanceEnv {
17921792
call_view,
17931793
call_view_anon,
17941794
view_name,
1795-
view_call.fn_ptr.0,
1795+
fn_ptr.0,
17961796
view_call.sender,
17971797
args_source.0,
17981798
result_sink,

0 commit comments

Comments
 (0)