Skip to content

Commit 51d7d88

Browse files
Fix anonymous view subscription cleanup
1 parent 61be6e6 commit 51d7d88

4 files changed

Lines changed: 318 additions & 51 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,12 +2349,12 @@ mod tests {
23492349
use std::fs::OpenOptions;
23502350
use std::path::PathBuf;
23512351
use std::rc::Rc;
2352-
use std::time::Instant;
2352+
use std::time::{Duration, Instant};
23532353

23542354
use super::tests_utils::begin_mut_tx;
23552355
use super::*;
23562356
use crate::db::relational_db::tests_utils::{
2357-
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
2357+
begin_tx, create_view_for_test, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
23582358
};
23592359
use anyhow::bail;
23602360
use bytes::Bytes;
@@ -2518,6 +2518,15 @@ mod tests {
25182518
Ok((view_id, table_id, module_def.clone(), view_def.clone()))
25192519
}
25202520

2521+
fn setup_anonymous_view(stdb: &TestDB) -> ResultTest<(ViewId, TableId)> {
2522+
Ok(create_view_for_test(
2523+
stdb,
2524+
"my_anonymous_view",
2525+
&[("b", AlgebraicType::U8)],
2526+
true,
2527+
)?)
2528+
}
2529+
25212530
fn insert_view_row(stdb: &TestDB, view_id: ViewId, table_id: TableId, sender: Identity, v: u8) -> ResultTest<()> {
25222531
let row_pv = |v: u8| product![v];
25232532

@@ -2543,6 +2552,22 @@ mod tests {
25432552
.collect()
25442553
}
25452554

2555+
fn project_anonymous_views(stdb: &TestDB, table_id: TableId) -> Vec<ProductValue> {
2556+
let tx = begin_tx(stdb);
2557+
2558+
stdb.iter(&tx, table_id)
2559+
.unwrap()
2560+
.map(|row| row.to_product_value())
2561+
.collect()
2562+
}
2563+
2564+
fn update_last_called(stdb: &TestDB, view_id: ViewId, sender: Identity, last_called: Timestamp) -> ResultTest<()> {
2565+
let mut tx = begin_mut_tx(stdb);
2566+
tx.update_view_timestamp_at(view_id, ArgId::SENTINEL, sender, last_called)?;
2567+
stdb.commit_tx(tx)?;
2568+
Ok(())
2569+
}
2570+
25462571
#[test]
25472572
fn test_view_tables_are_ephemeral_in_commitlog() -> ResultTest<()> {
25482573
let stdb = TestDB::durable_without_snapshot_repo()?;
@@ -2723,6 +2748,99 @@ mod tests {
27232748

27242749
Ok(())
27252750
}
2751+
2752+
/// Regression test for anonymous-view cleanup.
2753+
///
2754+
/// If one subscriber row has expired but another subscriber for the same anonymous
2755+
/// view is still live, cleanup must delete only the stale bookkeeping row and keep
2756+
/// the shared materialized backing table intact.
2757+
#[test]
2758+
fn test_anonymous_view_cleanup_keeps_rows_for_live_subscribers() -> ResultTest<()> {
2759+
let stdb = TestDB::durable()?;
2760+
let (view_id, table_id) = setup_anonymous_view(&stdb)?;
2761+
2762+
let stale_sender = Identity::ONE;
2763+
let live_sender = Identity::ZERO;
2764+
2765+
let mut tx = begin_mut_tx(&stdb);
2766+
tx.subscribe_view(view_id, ArgId::SENTINEL, stale_sender)?;
2767+
tx.subscribe_view(view_id, ArgId::SENTINEL, live_sender)?;
2768+
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
2769+
stdb.commit_tx(tx)?;
2770+
2771+
let mut tx = begin_mut_tx(&stdb);
2772+
tx.unsubscribe_view(view_id, ArgId::SENTINEL, stale_sender)?;
2773+
stdb.commit_tx(tx)?;
2774+
2775+
// Make one row definitely expired without relying on wall-clock sleeps.
2776+
update_last_called(&stdb, view_id, stale_sender, Timestamp::UNIX_EPOCH)?;
2777+
2778+
let mut tx = begin_mut_tx(&stdb);
2779+
tx.update_view_timestamp(view_id, ArgId::SENTINEL, live_sender)?;
2780+
stdb.commit_tx(tx)?;
2781+
2782+
// Cleanup should remove only the stale subscriber row and keep the shared
2783+
// anonymous materialization because another subscriber is still live.
2784+
let mut tx = begin_mut_tx(&stdb);
2785+
let (_cleaned, _total_expired) = tx.clear_expired_views(Duration::from_secs(1), VIEW_CLEANUP_BUDGET)?;
2786+
stdb.commit_tx(tx)?;
2787+
2788+
assert_eq!(
2789+
project_anonymous_views(&stdb, table_id),
2790+
vec![product![42u8]],
2791+
"anonymous view rows should survive cleanup while another identity is still subscribed"
2792+
);
2793+
2794+
let tx = begin_mut_tx(&stdb);
2795+
let st_after = tx.lookup_st_view_subs(view_id)?;
2796+
assert_eq!(st_after.len(), 1);
2797+
assert_eq!(st_after[0].identity.0, live_sender);
2798+
assert!(st_after[0].has_subscribers);
2799+
assert_eq!(st_after[0].num_subscribers, 1);
2800+
2801+
Ok(())
2802+
}
2803+
2804+
/// Regression test for anonymous-view cleanup.
2805+
///
2806+
/// Once the final subscriber row for an anonymous view has expired, cleanup must
2807+
/// remove both the stale bookkeeping row and the shared materialized backing table.
2808+
#[test]
2809+
fn test_anonymous_view_cleanup_clears_rows_when_unused() -> ResultTest<()> {
2810+
let stdb = TestDB::durable()?;
2811+
let (view_id, table_id) = setup_anonymous_view(&stdb)?;
2812+
2813+
let sender = Identity::ONE;
2814+
2815+
let mut tx = begin_mut_tx(&stdb);
2816+
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
2817+
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
2818+
stdb.commit_tx(tx)?;
2819+
2820+
let mut tx = begin_mut_tx(&stdb);
2821+
tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?;
2822+
stdb.commit_tx(tx)?;
2823+
2824+
// Mark the unsubscribed row as expired so cleanup can process it immediately.
2825+
update_last_called(&stdb, view_id, sender, Timestamp::UNIX_EPOCH)?;
2826+
2827+
// With no remaining subscriber rows, cleanup should drop the shared
2828+
// anonymous materialization and remove the bookkeeping row.
2829+
let mut tx = begin_mut_tx(&stdb);
2830+
let (_cleaned, _total_expired) = tx.clear_expired_views(Duration::from_secs(1), VIEW_CLEANUP_BUDGET)?;
2831+
stdb.commit_tx(tx)?;
2832+
2833+
assert!(
2834+
project_anonymous_views(&stdb, table_id).is_empty(),
2835+
"anonymous view rows should be cleared once no entries remain"
2836+
);
2837+
2838+
let tx = begin_mut_tx(&stdb);
2839+
let st_after = tx.lookup_st_view_subs(view_id)?;
2840+
assert!(st_after.is_empty());
2841+
2842+
Ok(())
2843+
}
27262844
#[test]
27272845
fn test_table_name() -> ResultTest<()> {
27282846
let stdb = TestDB::durable()?;

crates/core/src/host/module_host.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1884,7 +1884,12 @@ impl ModuleHost {
18841884
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
18851885
let is_anonymous = st_view_row.is_anonymous;
18861886
let sender = if is_anonymous { None } else { Some(caller) };
1887-
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? {
1887+
let is_materialized = if is_anonymous {
1888+
tx.is_anonymous_view_materialized(view_id)?
1889+
} else {
1890+
tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)?
1891+
};
1892+
if !is_materialized {
18881893
let (res, trapped) =
18891894
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
18901895
tx = res.tx;

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 151 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -688,44 +688,7 @@ impl InstanceCommon {
688688
tx: MutTxId,
689689
inst: &mut I,
690690
) -> Result<(ViewCallResult, bool), anyhow::Error> {
691-
let views = self.info.module_def.views().collect::<Vec<_>>();
692-
let owner_identity = self.info.owner_identity;
693-
694-
let mut view_calls = Vec::new();
695-
696-
for view in views {
697-
let ViewDef {
698-
name: view_name,
699-
is_anonymous,
700-
fn_ptr,
701-
product_type_ref,
702-
..
703-
} = view;
704-
705-
let st_view = tx
706-
.view_from_name(view_name)?
707-
.ok_or_else(|| anyhow::anyhow!("view {} not found in database", &view_name))?;
708-
709-
let view_id = st_view.view_id;
710-
let table_id = st_view
711-
.table_id
712-
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
713-
714-
for sub in tx.lookup_st_view_subs(view_id)? {
715-
view_calls.push(CallViewParams {
716-
view_name: view_name.clone(),
717-
view_id,
718-
table_id,
719-
fn_ptr: *fn_ptr,
720-
caller: owner_identity,
721-
sender: if *is_anonymous { None } else { Some(sub.identity.into()) },
722-
args: ArgsTuple::nullary(),
723-
row_type: *product_type_ref,
724-
timestamp: Timestamp::now(),
725-
});
726-
}
727-
}
728-
691+
let view_calls = collect_subscribed_view_calls(&tx, &self.info.module_def, self.info.owner_identity)?;
729692
Ok(self.execute_view_calls(tx, view_calls, inst))
730693
}
731694

@@ -1388,6 +1351,68 @@ impl InstanceCommon {
13881351
}
13891352
}
13901353

1354+
fn collect_subscribed_view_calls(
1355+
tx: &MutTxId,
1356+
module_def: &ModuleDef,
1357+
owner_identity: Identity,
1358+
) -> Result<Vec<CallViewParams>, anyhow::Error> {
1359+
let mut view_calls = Vec::new();
1360+
1361+
for view in module_def.views() {
1362+
let ViewDef {
1363+
name: view_name,
1364+
is_anonymous,
1365+
fn_ptr,
1366+
product_type_ref,
1367+
..
1368+
} = view;
1369+
1370+
let st_view = tx
1371+
.view_from_name(view_name)?
1372+
.ok_or_else(|| anyhow::anyhow!("view {} not found in database", &view_name))?;
1373+
1374+
let view_id = st_view.view_id;
1375+
let table_id = st_view
1376+
.table_id
1377+
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
1378+
let subs = tx.lookup_st_view_subs(view_id)?;
1379+
1380+
if *is_anonymous {
1381+
if subs.is_empty() {
1382+
continue;
1383+
}
1384+
view_calls.push(CallViewParams {
1385+
view_name: view_name.clone(),
1386+
view_id,
1387+
table_id,
1388+
fn_ptr: *fn_ptr,
1389+
caller: owner_identity,
1390+
sender: None,
1391+
args: ArgsTuple::nullary(),
1392+
row_type: *product_type_ref,
1393+
timestamp: Timestamp::now(),
1394+
});
1395+
continue;
1396+
}
1397+
1398+
for sub in subs {
1399+
view_calls.push(CallViewParams {
1400+
view_name: view_name.clone(),
1401+
view_id,
1402+
table_id,
1403+
fn_ptr: *fn_ptr,
1404+
caller: owner_identity,
1405+
sender: Some(sub.identity.into()),
1406+
args: ArgsTuple::nullary(),
1407+
row_type: *product_type_ref,
1408+
timestamp: Timestamp::now(),
1409+
});
1410+
}
1411+
}
1412+
1413+
Ok(view_calls)
1414+
}
1415+
13911416
/// Pre-fetched VM metrics counters for all reducers and views in a module.
13921417
/// Anonymous views have lazily fetched metrics counters.
13931418
struct AllVmMetrics {
@@ -1730,3 +1755,91 @@ impl InstanceOp for ProcedureOp {
17301755
FuncCallType::Procedure
17311756
}
17321757
}
1758+
1759+
#[cfg(test)]
1760+
mod tests {
1761+
use super::collect_subscribed_view_calls;
1762+
use crate::db::relational_db::tests_utils::{begin_mut_tx, TestDB};
1763+
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9Builder;
1764+
use spacetimedb_lib::{AlgebraicType, Identity, ProductType};
1765+
use spacetimedb_primitives::ArgId;
1766+
use spacetimedb_sats::raw_identifier::RawIdentifier;
1767+
use spacetimedb_schema::def::ModuleDef;
1768+
1769+
fn module_def_for_view(name: &str, is_anonymous: bool) -> ModuleDef {
1770+
let mut builder = RawModuleDefV9Builder::new();
1771+
let name = RawIdentifier::new(name);
1772+
let type_ref = builder.add_algebraic_type(
1773+
[],
1774+
name.clone(),
1775+
AlgebraicType::Product(ProductType::from_iter([("x", AlgebraicType::U8)])),
1776+
true,
1777+
);
1778+
1779+
builder.add_view(
1780+
name.clone(),
1781+
0,
1782+
true,
1783+
is_anonymous,
1784+
ProductType::unit(),
1785+
AlgebraicType::array(AlgebraicType::Ref(type_ref)),
1786+
);
1787+
1788+
builder.finish().try_into().expect("test module def should be valid")
1789+
}
1790+
1791+
/// Regression test for evaluating anonymous views.
1792+
///
1793+
/// Anonymous views have one shared materialization,
1794+
/// so we should only re-evaluate once even if there are multiple subscribers.
1795+
#[test]
1796+
fn test_dedup_anonymous_view_calls() -> anyhow::Result<()> {
1797+
let stdb = TestDB::in_memory()?;
1798+
let module_def = module_def_for_view("anonymous_view", true);
1799+
let view_def = module_def.view("anonymous_view").expect("view should exist");
1800+
1801+
let mut tx = begin_mut_tx(&stdb);
1802+
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1803+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1804+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1805+
1806+
// Two subscriber rows exist, but anonymous views should still be reevaluated once
1807+
// because they share a single materialization.
1808+
let calls = collect_subscribed_view_calls(&tx, &module_def, Identity::ZERO)?;
1809+
1810+
assert_eq!(
1811+
calls.len(),
1812+
1,
1813+
"anonymous views should only be reevaluated once even with multiple subscriber rows"
1814+
);
1815+
assert_eq!(calls[0].view_id, view_id);
1816+
assert_eq!(calls[0].sender, None);
1817+
Ok(())
1818+
}
1819+
1820+
/// Regression test for evaluating sender-scoped views.
1821+
///
1822+
/// These views have separate materializations per sender,
1823+
/// so reevaluation must emit one call per subscribed sender.
1824+
#[test]
1825+
fn test_distinct_sender_scoped_view_calls() -> anyhow::Result<()> {
1826+
let stdb = TestDB::in_memory()?;
1827+
let module_def = module_def_for_view("sender_view", false);
1828+
let view_def = module_def.view("sender_view").expect("view should exist");
1829+
1830+
let mut tx = begin_mut_tx(&stdb);
1831+
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1832+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1833+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1834+
1835+
// Sender-backed views keep one materialization per sender, so reevaluation must
1836+
// preserve both callers.
1837+
let calls = collect_subscribed_view_calls(&tx, &module_def, Identity::ZERO)?;
1838+
let senders: Vec<_> = calls.iter().filter_map(|call| call.sender).collect();
1839+
1840+
assert_eq!(calls.len(), 2, "sender views should still reevaluate once per sender");
1841+
assert!(senders.contains(&Identity::ZERO));
1842+
assert!(senders.contains(&Identity::ONE));
1843+
Ok(())
1844+
}
1845+
}

0 commit comments

Comments
 (0)