Skip to content

Commit 64b376a

Browse files
Track view read sets by arg hash
1 parent 41d935b commit 64b376a

10 files changed

Lines changed: 498 additions & 378 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
4747
use spacetimedb_data_structures::map::{HashCollectionExt as _, HashSet};
4848
use spacetimedb_datastore::error::DatastoreError;
4949
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
50-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
50+
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo, ViewInstanceArgs};
5151
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
5252
pub use spacetimedb_durability::{DurabilityExited, DurableOffset};
5353
use spacetimedb_engine::sql::rls::RowLevelExpr;
@@ -60,7 +60,7 @@ use spacetimedb_lib::http::{Request as HttpRequest, Response as HttpResponse};
6060
use spacetimedb_lib::identity::{AuthCtx, RequestId};
6161
use spacetimedb_lib::metrics::ExecutionMetrics;
6262
use spacetimedb_lib::{bsatn, ConnectionId, TimeDuration, Timestamp};
63-
use spacetimedb_primitives::{ArgId, HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId};
63+
use spacetimedb_primitives::{HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId};
6464
use spacetimedb_query::compile_subscription;
6565
use spacetimedb_sats::raw_identifier::RawIdentifier;
6666
use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue};
@@ -1103,17 +1103,6 @@ pub(crate) fn resolve_view_for_refresh<'a>(
11031103
)
11041104
})?;
11051105

1106-
let is_anonymous = view_call.sender.is_none();
1107-
1108-
if st_view.is_anonymous != is_anonymous {
1109-
return Err(anyhow::anyhow!(
1110-
"found is_anonymous={} in st_view, but {} in readset when updating view `{}`",
1111-
st_view.is_anonymous,
1112-
is_anonymous,
1113-
view_name,
1114-
));
1115-
}
1116-
11171106
let is_anonymous = view_def.is_anonymous;
11181107

11191108
if st_view.is_anonymous != is_anonymous {
@@ -2849,11 +2838,10 @@ impl ModuleHost {
28492838
}
28502839

28512840
/// Materializes the views return by the `view_collector`, if not already materialized,
2852-
/// and updates `st_view_sub` accordingly.
2841+
/// and updates view lifecycle state accordingly.
28532842
///
2854-
/// Passing [`Workload::Sql`] will update `st_view_sub.last_called`.
2855-
/// Passing [`Workload::Subscribe`] will also increment `st_view_sub.num_subscribers`,
2856-
/// in addition to updating `st_view_sub.last_called`.
2843+
/// Passing [`Workload::Sql`] will update the instance's last-used timestamp.
2844+
/// Passing [`Workload::Subscribe`] will also increment the subscriber's refcount.
28572845
pub fn materialize_views<I: WasmInstance>(
28582846
mut tx: MutTxId,
28592847
instance: &mut RefInstance<'_, I>,
@@ -2870,12 +2858,14 @@ impl ModuleHost {
28702858
let view_id = st_view_row.view_id;
28712859
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
28722860
let is_anonymous = st_view_row.is_anonymous;
2873-
let sender = if is_anonymous { None } else { Some(caller) };
2874-
let is_materialized = if is_anonymous {
2875-
tx.is_anonymous_view_materialized(view_id)?
2861+
let args = if is_anonymous {
2862+
ViewInstanceArgs::Anonymous
28762863
} else {
2877-
tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)?
2864+
ViewInstanceArgs::Sender(caller)
28782865
};
2866+
let view_call = ViewCallInfo::from_args(view_id, args);
2867+
let sender = args.sender();
2868+
let is_materialized = tx.is_view_materialized(&view_call)?;
28792869
if !is_materialized {
28802870
let (res, trapped) =
28812871
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
@@ -2886,11 +2876,11 @@ impl ModuleHost {
28862876
}
28872877
// If this is a sql call, we only update this view's "last called" timestamp
28882878
if let Workload::Sql = workload {
2889-
tx.update_view_timestamp(view_id, ArgId::SENTINEL, caller)?;
2879+
tx.update_view_timestamp(view_call.clone(), args)?;
28902880
}
28912881
// If this is a subscribe call, we also increment this view's subscriber count
28922882
if let Workload::Subscribe = workload {
2893-
tx.subscribe_view(view_id, ArgId::SENTINEL, caller)?;
2883+
tx.subscribe_view(view_call, args, caller)?;
28942884
}
28952885
}
28962886
Ok((tx, false))
@@ -2926,14 +2916,23 @@ impl ModuleHost {
29262916
let mut abi_duration = Duration::ZERO;
29272917
let mut trapped = false;
29282918
for view_call in tx.views_for_refresh().cloned().collect::<Vec<_>>() {
2929-
let sender = view_call.sender;
29302919
let resolved = match resolve_view_for_refresh(&tx, module_def, &view_call) {
29312920
Ok(resolved) => resolved,
29322921
Err(err) => {
29332922
outcome = ViewOutcome::Failed(format!("failed to resolve view: {err}"));
29342923
break;
29352924
}
29362925
};
2926+
let sender = match tx.view_instance_args(&view_call) {
2927+
Some(args) => args.sender(),
2928+
None => {
2929+
outcome = ViewOutcome::Failed(format!(
2930+
"failed to look up materialized view args for view {}",
2931+
view_call.view_id
2932+
));
2933+
break;
2934+
}
2935+
};
29372936
let ResolvedViewForRefresh {
29382937
view_id,
29392938
table_id,

crates/core/src/host/v8/syscall/common.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -763,10 +763,22 @@ fn refresh_views(
763763
let view_def = resolved.view_def;
764764
let view_name = &view_def.name;
765765
let fn_ptr = view_def.fn_ptr;
766+
let sender = tx
767+
.as_ref()
768+
.expect("procedure tx missing while looking up refreshed view args")
769+
.view_instance_args(&view_call)
770+
.ok_or_else(|| {
771+
TypeError(format!(
772+
"failed to look up materialized view args for view {}",
773+
view_call.view_id
774+
))
775+
.throw(scope)
776+
})?
777+
.sender();
766778

767779
let current_tx = tx.take().expect("procedure tx missing during view refresh");
768780
let (next_tx, call_result) = tx_slot.set(current_tx, || {
769-
call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr)
781+
call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr, sender)
770782
});
771783
tx = Some(next_tx);
772784
let return_data = call_result?;
@@ -851,14 +863,15 @@ fn call_view(
851863
view_name: &Identifier,
852864
table_id: TableId,
853865
fn_ptr: ViewFnPtr,
866+
sender: Option<Identity>,
854867
) -> SysCallResult<ViewReturnData> {
855868
let prev_func_type = get_env(scope)?
856869
.instance_env
857870
.swap_func_type(FuncCallType::View(view_call.clone()));
858871

859872
let result = {
860873
let args = crate::host::ArgsTuple::nullary();
861-
match view_call.sender {
874+
match sender {
862875
Some(sender) => call_call_view(
863876
scope,
864877
hooks,

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ impl InstanceCommon {
748748
}
749749
}
750750

751-
/// Re-evaluates all views which have entries in `st_view_subs`.
751+
/// Re-evaluates all materialized view instances tracked in view lifecycle state.
752752
fn evaluate_subscribed_views<I: WasmInstance>(
753753
&mut self,
754754
tx: MutTxId,
@@ -1346,7 +1346,10 @@ impl InstanceCommon {
13461346
(Ok(raw), sender) => {
13471347
// This is wrapped in a closure to simplify error handling.
13481348
let outcome: Result<ViewOutcome, anyhow::Error> = (|| {
1349-
let view_call = ViewCallInfo { view_id, sender };
1349+
let view_call = match sender {
1350+
Some(sender) => ViewCallInfo::sender(view_id, sender),
1351+
None => ViewCallInfo::anonymous(view_id),
1352+
};
13501353
let result = ViewResult::from_return_data(raw).context("Error parsing view result")?;
13511354
let typespace = self.info.module_def.typespace();
13521355
let row_product_type = typespace
@@ -1496,10 +1499,10 @@ fn collect_subscribed_view_calls(
14961499
let table_id = st_view
14971500
.table_id
14981501
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
1499-
let subs = tx.lookup_st_view_subs(view_id)?;
1502+
let view_instances = tx.subscribed_view_instances_for_view(view_id);
15001503

15011504
if *is_anonymous {
1502-
if subs.is_empty() {
1505+
if view_instances.is_empty() {
15031506
continue;
15041507
}
15051508
view_calls.push(CallViewParams {
@@ -1516,14 +1519,17 @@ fn collect_subscribed_view_calls(
15161519
continue;
15171520
}
15181521

1519-
for sub in subs {
1522+
for args in view_instances {
1523+
let Some(sender) = args.sender() else {
1524+
continue;
1525+
};
15201526
view_calls.push(CallViewParams {
15211527
view_name: view_name.clone(),
15221528
view_id,
15231529
table_id,
15241530
fn_ptr: *fn_ptr,
15251531
caller: owner_identity,
1526-
sender: Some(sub.identity.into()),
1532+
sender: Some(sender),
15271533
args: ArgsTuple::nullary(),
15281534
row_type: *product_type_ref,
15291535
timestamp: Timestamp::now(),
@@ -1770,10 +1776,7 @@ impl InstanceOp for ViewOp<'_> {
17701776
}
17711777

17721778
fn call_type(&self) -> FuncCallType {
1773-
FuncCallType::View(ViewCallInfo {
1774-
view_id: self.view_id,
1775-
sender: Some(*self.sender),
1776-
})
1779+
FuncCallType::View(ViewCallInfo::sender(self.view_id, *self.sender))
17771780
}
17781781
}
17791782

@@ -1798,10 +1801,7 @@ impl InstanceOp for AnonymousViewOp<'_> {
17981801
}
17991802

18001803
fn call_type(&self) -> FuncCallType {
1801-
FuncCallType::View(ViewCallInfo {
1802-
view_id: self.view_id,
1803-
sender: None,
1804-
})
1804+
FuncCallType::View(ViewCallInfo::anonymous(self.view_id))
18051805
}
18061806
}
18071807

@@ -1899,9 +1899,9 @@ impl InstanceOp for HttpHandlerOp {
18991899
mod tests {
19001900
use super::collect_subscribed_view_calls;
19011901
use crate::db::relational_db::tests_utils::{begin_mut_tx, TestDB};
1902+
use spacetimedb_datastore::locking_tx_datastore::{ViewCallInfo, ViewInstanceArgs};
19021903
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9Builder;
19031904
use spacetimedb_lib::{AlgebraicType, Identity, ProductType};
1904-
use spacetimedb_primitives::ArgId;
19051905
use spacetimedb_sats::raw_identifier::RawIdentifier;
19061906
use spacetimedb_schema::def::ModuleDef;
19071907

@@ -1939,8 +1939,9 @@ mod tests {
19391939

19401940
let mut tx = begin_mut_tx(&stdb);
19411941
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1942-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1943-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1942+
let view_call = ViewCallInfo::anonymous(view_id);
1943+
tx.subscribe_view(view_call.clone(), ViewInstanceArgs::Anonymous, Identity::ZERO)?;
1944+
tx.subscribe_view(view_call, ViewInstanceArgs::Anonymous, Identity::ONE)?;
19441945

19451946
// Two subscriber rows exist, but anonymous views should still be reevaluated once
19461947
// because they share a single materialization.
@@ -1968,8 +1969,10 @@ mod tests {
19681969

19691970
let mut tx = begin_mut_tx(&stdb);
19701971
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1971-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1972-
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1972+
let zero_args = ViewInstanceArgs::Sender(Identity::ZERO);
1973+
let one_args = ViewInstanceArgs::Sender(Identity::ONE);
1974+
tx.subscribe_view(ViewCallInfo::from_args(view_id, zero_args), zero_args, Identity::ZERO)?;
1975+
tx.subscribe_view(ViewCallInfo::from_args(view_id, one_args), one_args, Identity::ONE)?;
19731976

19741977
// Sender-backed views keep one materialization per sender, so reevaluation must
19751978
// preserve both callers.

crates/core/src/host/wasmtime/wasm_instance_env.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1777,10 +1777,22 @@ impl WasmInstanceEnv {
17771777
let view_def = resolved.view_def;
17781778
let view_name = &view_def.name;
17791779
let fn_ptr = view_def.fn_ptr;
1780+
let sender = tx
1781+
.as_ref()
1782+
.expect("procedure tx missing while looking up refreshed view args")
1783+
.view_instance_args(&view_call)
1784+
.ok_or_else(|| {
1785+
anyhow!(
1786+
"failed to look up materialized view args for view {}",
1787+
view_call.view_id
1788+
)
1789+
})?
1790+
.sender();
17801791

17811792
let current_tx = tx.take().expect("procedure tx missing during view refresh");
1782-
let (next_tx, call_result) =
1783-
tx_slot.set(current_tx, || Self::call_view(caller, &view_call, view_name, fn_ptr));
1793+
let (next_tx, call_result) = tx_slot.set(current_tx, || {
1794+
Self::call_view(caller, &view_call, view_name, fn_ptr, sender)
1795+
});
17841796
tx = Some(next_tx);
17851797
let return_data = call_result?;
17861798

@@ -1836,6 +1848,7 @@ impl WasmInstanceEnv {
18361848
view_call: &ViewCallInfo,
18371849
view_name: &Identifier,
18381850
fn_ptr: ViewFnPtr,
1851+
sender: Option<Identity>,
18391852
) -> anyhow::Result<ViewReturnData> {
18401853
let prev_func_type = caller
18411854
.data_mut()
@@ -1863,7 +1876,7 @@ impl WasmInstanceEnv {
18631876
call_view_anon,
18641877
view_name,
18651878
fn_ptr.0,
1866-
view_call.sender,
1879+
sender,
18671880
args_source.0,
18681881
result_sink,
18691882
true,

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap, HashSet}
3232
use spacetimedb_datastore::db_metrics::DB_METRICS;
3333
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
3434
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
35-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
35+
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo};
3636
use spacetimedb_datastore::traits::{IsolationLevel, TxData};
3737
use spacetimedb_durability::TxOffset;
3838
use spacetimedb_execution::ExecutionParams;
@@ -42,7 +42,6 @@ use spacetimedb_lib::metrics::ExecutionMetrics;
4242
use spacetimedb_lib::Identity;
4343
use spacetimedb_lib::{bsatn, identity::AuthCtx};
4444
use spacetimedb_physical_plan::plan::ProjectPlan;
45-
use spacetimedb_primitives::ArgId;
4645
use spacetimedb_schema::def::RawModuleDefVersion;
4746
use spacetimedb_table::static_assert_size;
4847
use std::{
@@ -1855,7 +1854,7 @@ impl ModuleSubscriptions {
18551854
/// and subsequently downgrades to a read-only transaction.
18561855
///
18571856
/// Unlike [`Self::materialize_views_and_downgrade_tx`] which populates the views' backing tables,
1858-
/// this method just decrements the subscriber count in `st_view_sub`.
1857+
/// this method just decrements the subscriber count in view lifecycle state.
18591858
/// Views without any subscribers are cleaned up async.
18601859
fn unsubscribe_views_and_downgrade_tx(
18611860
&self,
@@ -1869,7 +1868,7 @@ impl ModuleSubscriptions {
18691868
Ok(self.guard_tx(tx, opts))
18701869
}
18711870

1872-
/// We unsubscribe from views by decrementing the subscriber count in `st_view_sub`.
1871+
/// We unsubscribe from views by decrementing the subscriber count in the view lifecycle state.
18731872
/// Views without any subscribers are cleaned up async.
18741873
fn _unsubscribe_views(
18751874
tx: &mut MutTxId,
@@ -1879,7 +1878,13 @@ impl ModuleSubscriptions {
18791878
let mut view_ids = HashSet::new();
18801879
view_collector.collect_views(&mut view_ids);
18811880
for view_id in view_ids {
1882-
tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?;
1881+
let is_anonymous = tx.lookup_st_view(view_id)?.is_anonymous;
1882+
let view_call = if is_anonymous {
1883+
ViewCallInfo::anonymous(view_id)
1884+
} else {
1885+
ViewCallInfo::sender(view_id, sender)
1886+
};
1887+
tx.unsubscribe_view(view_call, sender)?;
18831888
}
18841889
Ok(())
18851890
}

0 commit comments

Comments
 (0)