Skip to content

Commit c009c67

Browse files
Track view read sets by arg hash (#5443)
# Description of Changes Removes `arg_id` from read sets, and also moves view lifecycle management from `st_view_sub` into the committed state. Trying to move away from system tables and persistent state in general for views since this has been the source of many issues. # API and ABI breaking changes None # Expected complexity level and risk 2 # Testing Existing coverage.
1 parent 5895dbd commit c009c67

11 files changed

Lines changed: 502 additions & 377 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
4747
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashSet};
4848
use spacetimedb_datastore::error::DatastoreError;
4949
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
50-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
50+
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo, ViewInstanceArgs};
5151
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
5252
pub use spacetimedb_durability::{DurabilityExited, DurableOffset};
5353
use spacetimedb_engine::sql::rls::RowLevelExpr;
@@ -60,7 +60,7 @@ use spacetimedb_lib::http::{Request as HttpRequest, Response as HttpResponse};
6060
use spacetimedb_lib::identity::{AuthCtx, RequestId};
6161
use spacetimedb_lib::metrics::ExecutionMetrics;
6262
use spacetimedb_lib::{bsatn, ConnectionId, TimeDuration, Timestamp};
63-
use spacetimedb_primitives::{ArgId, HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId};
63+
use spacetimedb_primitives::{HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId};
6464
use spacetimedb_query::compile_subscription;
6565
use spacetimedb_sats::raw_identifier::RawIdentifier;
6666
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue};
@@ -1121,17 +1121,6 @@ pub(crate) fn resolve_view_for_refresh<'a>(
11211121
)
11221122
})?;
11231123

1124-
let is_anonymous = view_call.sender.is_none();
1125-
1126-
if st_view.is_anonymous != is_anonymous {
1127-
return Err(anyhow::anyhow!(
1128-
"found is_anonymous={} in st_view, but {} in readset when updating view `{}`",
1129-
st_view.is_anonymous,
1130-
is_anonymous,
1131-
view_name,
1132-
));
1133-
}
1134-
11351124
let is_anonymous = view_def.is_anonymous;
11361125

11371126
if st_view.is_anonymous != is_anonymous {
@@ -2867,11 +2856,10 @@ impl ModuleHost {
28672856
}
28682857

28692858
/// Materializes the views return by the `view_collector`, if not already materialized,
2870-
/// and updates `st_view_sub` accordingly.
2859+
/// and updates view lifecycle state accordingly.
28712860
///
2872-
/// Passing [`Workload::Sql`] will update `st_view_sub.last_called`.
2873-
/// Passing [`Workload::Subscribe`] will also increment `st_view_sub.num_subscribers`,
2874-
/// in addition to updating `st_view_sub.last_called`.
2861+
/// Passing [`Workload::Sql`] will update the instance's last-used timestamp.
2862+
/// Passing [`Workload::Subscribe`] will also increment the subscriber's refcount.
28752863
pub fn materialize_views<I: WasmInstance>(
28762864
mut tx: MutTxId,
28772865
instance: &mut RefInstance<'_, I>,
@@ -2888,12 +2876,14 @@ impl ModuleHost {
28882876
let view_id = st_view_row.view_id;
28892877
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
28902878
let is_anonymous = st_view_row.is_anonymous;
2891-
let sender = if is_anonymous { None } else { Some(caller) };
2892-
let is_materialized = if is_anonymous {
2893-
tx.is_anonymous_view_materialized(view_id)?
2879+
let args = if is_anonymous {
2880+
ViewInstanceArgs::Anonymous
28942881
} else {
2895-
tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)?
2882+
ViewInstanceArgs::Sender(caller)
28962883
};
2884+
let view_call = ViewCallInfo::from_args(view_id, args);
2885+
let sender = args.sender();
2886+
let is_materialized = tx.is_view_materialized(&view_call)?;
28972887
if !is_materialized {
28982888
let (res, trapped) =
28992889
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
@@ -2904,11 +2894,11 @@ impl ModuleHost {
29042894
}
29052895
// If this is a sql call, we only update this view's "last called" timestamp
29062896
if let Workload::Sql = workload {
2907-
tx.update_view_timestamp(view_id, ArgId::SENTINEL, caller)?;
2897+
tx.update_view_timestamp(view_call.clone(), args)?;
29082898
}
29092899
// If this is a subscribe call, we also increment this view's subscriber count
29102900
if let Workload::Subscribe = workload {
2911-
tx.subscribe_view(view_id, ArgId::SENTINEL, caller)?;
2901+
tx.subscribe_view(view_call, args, caller)?;
29122902
}
29132903
}
29142904
Ok((tx, false))
@@ -2944,14 +2934,23 @@ impl ModuleHost {
29442934
let mut abi_duration = Duration::ZERO;
29452935
let mut trapped = false;
29462936
for view_call in tx.views_for_refresh().cloned().collect::<Vec<_>>() {
2947-
let sender = view_call.sender;
29482937
let resolved = match resolve_view_for_refresh(&tx, module_def, &view_call) {
29492938
Ok(resolved) => resolved,
29502939
Err(err) => {
29512940
outcome = ViewOutcome::Failed(format!("failed to resolve view: {err}"));
29522941
break;
29532942
}
29542943
};
2944+
let sender = match tx.view_instance_args(&view_call) {
2945+
Some(args) => args.sender(),
2946+
None => {
2947+
outcome = ViewOutcome::Failed(format!(
2948+
"failed to look up materialized view args for view {}",
2949+
view_call.view_id
2950+
));
2951+
break;
2952+
}
2953+
};
29552954
let ResolvedViewForRefresh {
29562955
view_id,
29572956
table_id,

crates/core/src/host/v8/syscall/common.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,10 +763,22 @@ fn refresh_views(
763763
let view_def = resolved.view_def;
764764
let view_name = &view_def.name;
765765
let fn_ptr = view_def.fn_ptr;
766+
let sender = tx
767+
.as_ref()
768+
.expect("procedure tx missing while looking up refreshed view args")
769+
.view_instance_args(&view_call)
770+
.ok_or_else(|| {
771+
TypeError(format!(
772+
"failed to look up materialized view args for view {}",
773+
view_call.view_id
774+
))
775+
.throw(scope)
776+
})?
777+
.sender();
766778

767779
let current_tx = tx.take().expect("procedure tx missing during view refresh");
768780
let (next_tx, call_result) = tx_slot.set(current_tx, || {
769-
call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr)
781+
call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr, sender)
770782
});
771783
tx = Some(next_tx);
772784
let return_data = call_result?;
@@ -851,14 +863,15 @@ fn call_view(
851863
view_name: &Identifier,
852864
table_id: TableId,
853865
fn_ptr: ViewFnPtr,
866+
sender: Option<Identity>,
854867
) -> SysCallResult<ViewReturnData> {
855868
let prev_func_type = get_env(scope)?
856869
.instance_env
857870
.swap_func_type(FuncCallType::View(view_call.clone()));
858871

859872
let result = {
860873
let args = crate::host::ArgsTuple::nullary();
861-
match view_call.sender {
874+
match sender {
862875
Some(sender) => call_call_view(
863876
scope,
864877
hooks,

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ impl InstanceCommon {
758758
}
759759
}
760760

761-
/// Re-evaluates all views which have entries in `st_view_subs`.
761+
/// Re-evaluates all materialized view instances tracked in view lifecycle state.
762762
fn evaluate_subscribed_views<I: WasmInstance>(
763763
&mut self,
764764
tx: MutTxId,
@@ -1367,7 +1367,10 @@ impl InstanceCommon {
13671367
(Ok(raw), sender) => {
13681368
// This is wrapped in a closure to simplify error handling.
13691369
let outcome: Result<ViewOutcome, anyhow::Error> = (|| {
1370-
let view_call = ViewCallInfo { view_id, sender };
1370+
let view_call = match sender {
1371+
Some(sender) => ViewCallInfo::sender(view_id, sender),
1372+
None => ViewCallInfo::anonymous(view_id),
1373+
};
13711374
let result = ViewResult::from_return_data(raw).context("Error parsing view result")?;
13721375
let typespace = self.info.module_def.typespace();
13731376
let row_product_type = typespace
@@ -1517,10 +1520,10 @@ fn collect_subscribed_view_calls(
15171520
let table_id = st_view
15181521
.table_id
15191522
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
1520-
let subs = tx.lookup_st_view_subs(view_id)?;
1523+
let view_instances = tx.materialized_view_instances_for_view(view_id);
15211524

15221525
if *is_anonymous {
1523-
if subs.is_empty() {
1526+
if view_instances.is_empty() {
15241527
continue;
15251528
}
15261529
view_calls.push(CallViewParams {
@@ -1537,14 +1540,17 @@ fn collect_subscribed_view_calls(
15371540
continue;
15381541
}
15391542

1540-
for sub in subs {
1543+
for args in view_instances {
1544+
let Some(sender) = args.sender() else {
1545+
continue;
1546+
};
15411547
view_calls.push(CallViewParams {
15421548
view_name: view_name.clone(),
15431549
view_id,
15441550
table_id,
15451551
fn_ptr: *fn_ptr,
15461552
caller: owner_identity,
1547-
sender: Some(sub.identity.into()),
1553+
sender: Some(sender),
15481554
args: ArgsTuple::nullary(),
15491555
row_type: *product_type_ref,
15501556
timestamp: Timestamp::now(),
@@ -1791,10 +1797,7 @@ impl InstanceOp for ViewOp<'_> {
17911797
}
17921798

17931799
fn call_type(&self) -> FuncCallType {
1794-
FuncCallType::View(ViewCallInfo {
1795-
view_id: self.view_id,
1796-
sender: Some(*self.sender),
1797-
})
1800+
FuncCallType::View(ViewCallInfo::sender(self.view_id, *self.sender))
17981801
}
17991802
}
18001803

@@ -1819,10 +1822,7 @@ impl InstanceOp for AnonymousViewOp<'_> {
18191822
}
18201823

18211824
fn call_type(&self) -> FuncCallType {
1822-
FuncCallType::View(ViewCallInfo {
1823-
view_id: self.view_id,
1824-
sender: None,
1825-
})
1825+
FuncCallType::View(ViewCallInfo::anonymous(self.view_id))
18261826
}
18271827
}
18281828

@@ -1920,9 +1920,9 @@ impl InstanceOp for HttpHandlerOp {
19201920
mod tests {
19211921
use super::collect_subscribed_view_calls;
19221922
use crate::db::relational_db::tests_utils::{begin_mut_tx, TestDB};
1923+
use spacetimedb_datastore::locking_tx_datastore::{ViewCallInfo, ViewInstanceArgs};
19231924
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9Builder;
19241925
use spacetimedb_lib::{AlgebraicType, Identity, ProductType};
1925-
use spacetimedb_primitives::ArgId;
19261926
use spacetimedb_sats::raw_identifier::RawIdentifier;
19271927
use spacetimedb_schema::def::ModuleDef;
19281928

@@ -1960,8 +1960,9 @@ mod tests {
19601960

19611961
let mut tx = begin_mut_tx(&stdb);
19621962
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1963-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1964-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1963+
let view_call = ViewCallInfo::anonymous(view_id);
1964+
tx.subscribe_view(view_call.clone(), ViewInstanceArgs::Anonymous, Identity::ZERO)?;
1965+
tx.subscribe_view(view_call, ViewInstanceArgs::Anonymous, Identity::ONE)?;
19651966

19661967
// Two subscriber rows exist, but anonymous views should still be reevaluated once
19671968
// because they share a single materialization.
@@ -1989,8 +1990,10 @@ mod tests {
19891990

19901991
let mut tx = begin_mut_tx(&stdb);
19911992
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1992-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1993-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1993+
let zero_args = ViewInstanceArgs::Sender(Identity::ZERO);
1994+
let one_args = ViewInstanceArgs::Sender(Identity::ONE);
1995+
tx.subscribe_view(ViewCallInfo::from_args(view_id, zero_args), zero_args, Identity::ZERO)?;
1996+
tx.subscribe_view(ViewCallInfo::from_args(view_id, one_args), one_args, Identity::ONE)?;
19941997

19951998
// Sender-backed views keep one materialization per sender, so reevaluation must
19961999
// preserve both callers.

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1777,10 +1777,22 @@ impl WasmInstanceEnv {
17771777
let view_def = resolved.view_def;
17781778
let view_name = &view_def.name;
17791779
let fn_ptr = view_def.fn_ptr;
1780+
let sender = tx
1781+
.as_ref()
1782+
.expect("procedure tx missing while looking up refreshed view args")
1783+
.view_instance_args(&view_call)
1784+
.ok_or_else(|| {
1785+
anyhow!(
1786+
"failed to look up materialized view args for view {}",
1787+
view_call.view_id
1788+
)
1789+
})?
1790+
.sender();
17801791

17811792
let current_tx = tx.take().expect("procedure tx missing during view refresh");
1782-
let (next_tx, call_result) =
1783-
tx_slot.set(current_tx, || Self::call_view(caller, &view_call, view_name, fn_ptr));
1793+
let (next_tx, call_result) = tx_slot.set(current_tx, || {
1794+
Self::call_view(caller, &view_call, view_name, fn_ptr, sender)
1795+
});
17841796
tx = Some(next_tx);
17851797
let return_data = call_result?;
17861798

@@ -1836,6 +1848,7 @@ impl WasmInstanceEnv {
18361848
view_call: &ViewCallInfo,
18371849
view_name: &Identifier,
18381850
fn_ptr: ViewFnPtr,
1851+
sender: Option<Identity>,
18391852
) -> anyhow::Result<ViewReturnData> {
18401853
let prev_func_type = caller
18411854
.data_mut()
@@ -1863,7 +1876,7 @@ impl WasmInstanceEnv {
18631876
call_view_anon,
18641877
view_name,
18651878
fn_ptr.0,
1866-
view_call.sender,
1879+
sender,
18671880
args_source.0,
18681881
result_sink,
18691882
true,

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap, HashSet}
3232
use spacetimedb_datastore::db_metrics::DB_METRICS;
3333
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
3434
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
35-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
35+
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo};
3636
use spacetimedb_datastore::traits::{IsolationLevel, TxData};
3737
use spacetimedb_durability::TxOffset;
3838
use spacetimedb_execution::ExecutionParams;
@@ -42,7 +42,6 @@ use spacetimedb_lib::metrics::ExecutionMetrics;
4242
use spacetimedb_lib::Identity;
4343
use spacetimedb_lib::{bsatn, identity::AuthCtx};
4444
use spacetimedb_physical_plan::plan::ProjectPlan;
45-
use spacetimedb_primitives::ArgId;
4645
use spacetimedb_schema::def::RawModuleDefVersion;
4746
use spacetimedb_table::static_assert_size;
4847
use std::{
@@ -1855,7 +1854,7 @@ impl ModuleSubscriptions {
18551854
/// and subsequently downgrades to a read-only transaction.
18561855
///
18571856
/// Unlike [`Self::materialize_views_and_downgrade_tx`] which populates the views' backing tables,
1858-
/// this method just decrements the subscriber count in `st_view_sub`.
1857+
/// this method just decrements the subscriber count in view lifecycle state.
18591858
/// Views without any subscribers are cleaned up async.
18601859
fn unsubscribe_views_and_downgrade_tx(
18611860
&self,
@@ -1869,7 +1868,7 @@ impl ModuleSubscriptions {
18691868
Ok(self.guard_tx(tx, opts))
18701869
}
18711870

1872-
/// We unsubscribe from views by decrementing the subscriber count in `st_view_sub`.
1871+
/// We unsubscribe from views by decrementing the subscriber count in the view lifecycle state.
18731872
/// Views without any subscribers are cleaned up async.
18741873
fn _unsubscribe_views(
18751874
tx: &mut MutTxId,
@@ -1879,7 +1878,13 @@ impl ModuleSubscriptions {
18791878
let mut view_ids = HashSet::new();
18801879
view_collector.collect_views(&mut view_ids);
18811880
for view_id in view_ids {
1882-
tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?;
1881+
let is_anonymous = tx.lookup_st_view(view_id)?.is_anonymous;
1882+
let view_call = if is_anonymous {
1883+
ViewCallInfo::anonymous(view_id)
1884+
} else {
1885+
ViewCallInfo::sender(view_id, sender)
1886+
};
1887+
tx.unsubscribe_view(view_call, sender)?;
18831888
}
18841889
Ok(())
18851890
}

0 commit comments

Comments
 (0)