Skip to content

Commit 3ac3ad3

Browse files
Merge branch 'master' into jlarabie/unreal-sdk-2.0
2 parents 8d89c1c + f181fce commit 3ac3ad3

4 files changed

Lines changed: 318 additions & 51 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2349,12 +2349,12 @@ mod tests {
23492349
use std::fs::OpenOptions;
23502350
use std::path::PathBuf;
23512351
use std::rc::Rc;
2352-
use std::time::Instant;
2352+
use std::time::{Duration, Instant};
23532353

23542354
use super::tests_utils::begin_mut_tx;
23552355
use super::*;
23562356
use crate::db::relational_db::tests_utils::{
2357-
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
2357+
begin_tx, create_view_for_test, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
23582358
};
23592359
use anyhow::bail;
23602360
use bytes::Bytes;
@@ -2518,6 +2518,15 @@ mod tests {
25182518
Ok((view_id, table_id, module_def.clone(), view_def.clone()))
25192519
}
25202520

2521+
fn setup_anonymous_view(stdb: &TestDB) -> ResultTest<(ViewId, TableId)> {
2522+
Ok(create_view_for_test(
2523+
stdb,
2524+
"my_anonymous_view",
2525+
&[("b", AlgebraicType::U8)],
2526+
true,
2527+
)?)
2528+
}
2529+
25212530
fn insert_view_row(stdb: &TestDB, view_id: ViewId, table_id: TableId, sender: Identity, v: u8) -> ResultTest<()> {
25222531
let row_pv = |v: u8| product![v];
25232532

@@ -2543,6 +2552,22 @@ mod tests {
25432552
.collect()
25442553
}
25452554

2555+
fn project_anonymous_views(stdb: &TestDB, table_id: TableId) -> Vec<ProductValue> {
2556+
let tx = begin_tx(stdb);
2557+
2558+
stdb.iter(&tx, table_id)
2559+
.unwrap()
2560+
.map(|row| row.to_product_value())
2561+
.collect()
2562+
}
2563+
2564+
fn update_last_called(stdb: &TestDB, view_id: ViewId, sender: Identity, last_called: Timestamp) -> ResultTest<()> {
2565+
let mut tx = begin_mut_tx(stdb);
2566+
tx.update_view_timestamp_at(view_id, ArgId::SENTINEL, sender, last_called)?;
2567+
stdb.commit_tx(tx)?;
2568+
Ok(())
2569+
}
2570+
25462571
#[test]
25472572
fn test_view_tables_are_ephemeral_in_commitlog() -> ResultTest<()> {
25482573
let stdb = TestDB::durable_without_snapshot_repo()?;
@@ -2723,6 +2748,99 @@ mod tests {
27232748

27242749
Ok(())
27252750
}
2751+
2752+
/// Regression test for anonymous-view cleanup.
2753+
///
2754+
/// If one subscriber row has expired but another subscriber for the same anonymous
2755+
/// view is still live, cleanup must delete only the stale bookkeeping row and keep
2756+
/// the shared materialized backing table intact.
2757+
#[test]
2758+
fn test_anonymous_view_cleanup_keeps_rows_for_live_subscribers() -> ResultTest<()> {
2759+
let stdb = TestDB::durable()?;
2760+
let (view_id, table_id) = setup_anonymous_view(&stdb)?;
2761+
2762+
let stale_sender = Identity::ONE;
2763+
let live_sender = Identity::ZERO;
2764+
2765+
let mut tx = begin_mut_tx(&stdb);
2766+
tx.subscribe_view(view_id, ArgId::SENTINEL, stale_sender)?;
2767+
tx.subscribe_view(view_id, ArgId::SENTINEL, live_sender)?;
2768+
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
2769+
stdb.commit_tx(tx)?;
2770+
2771+
let mut tx = begin_mut_tx(&stdb);
2772+
tx.unsubscribe_view(view_id, ArgId::SENTINEL, stale_sender)?;
2773+
stdb.commit_tx(tx)?;
2774+
2775+
// Make one row definitely expired without relying on wall-clock sleeps.
2776+
update_last_called(&stdb, view_id, stale_sender, Timestamp::UNIX_EPOCH)?;
2777+
2778+
let mut tx = begin_mut_tx(&stdb);
2779+
tx.update_view_timestamp(view_id, ArgId::SENTINEL, live_sender)?;
2780+
stdb.commit_tx(tx)?;
2781+
2782+
// Cleanup should remove only the stale subscriber row and keep the shared
2783+
// anonymous materialization because another subscriber is still live.
2784+
let mut tx = begin_mut_tx(&stdb);
2785+
let (_cleaned, _total_expired) = tx.clear_expired_views(Duration::from_secs(1), VIEW_CLEANUP_BUDGET)?;
2786+
stdb.commit_tx(tx)?;
2787+
2788+
assert_eq!(
2789+
project_anonymous_views(&stdb, table_id),
2790+
vec![product![42u8]],
2791+
"anonymous view rows should survive cleanup while another identity is still subscribed"
2792+
);
2793+
2794+
let tx = begin_mut_tx(&stdb);
2795+
let st_after = tx.lookup_st_view_subs(view_id)?;
2796+
assert_eq!(st_after.len(), 1);
2797+
assert_eq!(st_after[0].identity.0, live_sender);
2798+
assert!(st_after[0].has_subscribers);
2799+
assert_eq!(st_after[0].num_subscribers, 1);
2800+
2801+
Ok(())
2802+
}
2803+
2804+
/// Regression test for anonymous-view cleanup.
2805+
///
2806+
/// Once the final subscriber row for an anonymous view has expired, cleanup must
2807+
/// remove both the stale bookkeeping row and the shared materialized backing table.
2808+
#[test]
2809+
fn test_anonymous_view_cleanup_clears_rows_when_unused() -> ResultTest<()> {
2810+
let stdb = TestDB::durable()?;
2811+
let (view_id, table_id) = setup_anonymous_view(&stdb)?;
2812+
2813+
let sender = Identity::ONE;
2814+
2815+
let mut tx = begin_mut_tx(&stdb);
2816+
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
2817+
stdb.materialize_anonymous_view(&mut tx, table_id, vec![product![42u8]])?;
2818+
stdb.commit_tx(tx)?;
2819+
2820+
let mut tx = begin_mut_tx(&stdb);
2821+
tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?;
2822+
stdb.commit_tx(tx)?;
2823+
2824+
// Mark the unsubscribed row as expired so cleanup can process it immediately.
2825+
update_last_called(&stdb, view_id, sender, Timestamp::UNIX_EPOCH)?;
2826+
2827+
// With no remaining subscriber rows, cleanup should drop the shared
2828+
// anonymous materialization and remove the bookkeeping row.
2829+
let mut tx = begin_mut_tx(&stdb);
2830+
let (_cleaned, _total_expired) = tx.clear_expired_views(Duration::from_secs(1), VIEW_CLEANUP_BUDGET)?;
2831+
stdb.commit_tx(tx)?;
2832+
2833+
assert!(
2834+
project_anonymous_views(&stdb, table_id).is_empty(),
2835+
"anonymous view rows should be cleared once no entries remain"
2836+
);
2837+
2838+
let tx = begin_mut_tx(&stdb);
2839+
let st_after = tx.lookup_st_view_subs(view_id)?;
2840+
assert!(st_after.is_empty());
2841+
2842+
Ok(())
2843+
}
27262844
#[test]
27272845
fn test_table_name() -> ResultTest<()> {
27282846
let stdb = TestDB::durable()?;

crates/core/src/host/module_host.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1884,7 +1884,12 @@ impl ModuleHost {
18841884
let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?;
18851885
let is_anonymous = st_view_row.is_anonymous;
18861886
let sender = if is_anonymous { None } else { Some(caller) };
1887-
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? {
1887+
let is_materialized = if is_anonymous {
1888+
tx.is_anonymous_view_materialized(view_id)?
1889+
} else {
1890+
tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)?
1891+
};
1892+
if !is_materialized {
18881893
let (res, trapped) =
18891894
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
18901895
tx = res.tx;

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 151 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -688,44 +688,7 @@ impl InstanceCommon {
688688
tx: MutTxId,
689689
inst: &mut I,
690690
) -> Result<(ViewCallResult, bool), anyhow::Error> {
691-
let views = self.info.module_def.views().collect::<Vec<_>>();
692-
let owner_identity = self.info.owner_identity;
693-
694-
let mut view_calls = Vec::new();
695-
696-
for view in views {
697-
let ViewDef {
698-
name: view_name,
699-
is_anonymous,
700-
fn_ptr,
701-
product_type_ref,
702-
..
703-
} = view;
704-
705-
let st_view = tx
706-
.view_from_name(view_name)?
707-
.ok_or_else(|| anyhow::anyhow!("view {} not found in database", &view_name))?;
708-
709-
let view_id = st_view.view_id;
710-
let table_id = st_view
711-
.table_id
712-
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
713-
714-
for sub in tx.lookup_st_view_subs(view_id)? {
715-
view_calls.push(CallViewParams {
716-
view_name: view_name.clone(),
717-
view_id,
718-
table_id,
719-
fn_ptr: *fn_ptr,
720-
caller: owner_identity,
721-
sender: if *is_anonymous { None } else { Some(sub.identity.into()) },
722-
args: ArgsTuple::nullary(),
723-
row_type: *product_type_ref,
724-
timestamp: Timestamp::now(),
725-
});
726-
}
727-
}
728-
691+
let view_calls = collect_subscribed_view_calls(&tx, &self.info.module_def, self.info.owner_identity)?;
729692
Ok(self.execute_view_calls(tx, view_calls, inst))
730693
}
731694

@@ -1370,6 +1333,68 @@ impl InstanceCommon {
13701333
}
13711334
}
13721335

1336+
fn collect_subscribed_view_calls(
1337+
tx: &MutTxId,
1338+
module_def: &ModuleDef,
1339+
owner_identity: Identity,
1340+
) -> Result<Vec<CallViewParams>, anyhow::Error> {
1341+
let mut view_calls = Vec::new();
1342+
1343+
for view in module_def.views() {
1344+
let ViewDef {
1345+
name: view_name,
1346+
is_anonymous,
1347+
fn_ptr,
1348+
product_type_ref,
1349+
..
1350+
} = view;
1351+
1352+
let st_view = tx
1353+
.view_from_name(view_name)?
1354+
.ok_or_else(|| anyhow::anyhow!("view {} not found in database", &view_name))?;
1355+
1356+
let view_id = st_view.view_id;
1357+
let table_id = st_view
1358+
.table_id
1359+
.ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?;
1360+
let subs = tx.lookup_st_view_subs(view_id)?;
1361+
1362+
if *is_anonymous {
1363+
if subs.is_empty() {
1364+
continue;
1365+
}
1366+
view_calls.push(CallViewParams {
1367+
view_name: view_name.clone(),
1368+
view_id,
1369+
table_id,
1370+
fn_ptr: *fn_ptr,
1371+
caller: owner_identity,
1372+
sender: None,
1373+
args: ArgsTuple::nullary(),
1374+
row_type: *product_type_ref,
1375+
timestamp: Timestamp::now(),
1376+
});
1377+
continue;
1378+
}
1379+
1380+
for sub in subs {
1381+
view_calls.push(CallViewParams {
1382+
view_name: view_name.clone(),
1383+
view_id,
1384+
table_id,
1385+
fn_ptr: *fn_ptr,
1386+
caller: owner_identity,
1387+
sender: Some(sub.identity.into()),
1388+
args: ArgsTuple::nullary(),
1389+
row_type: *product_type_ref,
1390+
timestamp: Timestamp::now(),
1391+
});
1392+
}
1393+
}
1394+
1395+
Ok(view_calls)
1396+
}
1397+
13731398
/// Pre-fetched VM metrics counters for all reducers and views in a module.
13741399
/// Anonymous views have lazily fetched metrics counters.
13751400
struct AllVmMetrics {
@@ -1712,3 +1737,91 @@ impl InstanceOp for ProcedureOp {
17121737
FuncCallType::Procedure
17131738
}
17141739
}
1740+
1741+
#[cfg(test)]
1742+
mod tests {
1743+
use super::collect_subscribed_view_calls;
1744+
use crate::db::relational_db::tests_utils::{begin_mut_tx, TestDB};
1745+
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9Builder;
1746+
use spacetimedb_lib::{AlgebraicType, Identity, ProductType};
1747+
use spacetimedb_primitives::ArgId;
1748+
use spacetimedb_sats::raw_identifier::RawIdentifier;
1749+
use spacetimedb_schema::def::ModuleDef;
1750+
1751+
fn module_def_for_view(name: &str, is_anonymous: bool) -> ModuleDef {
1752+
let mut builder = RawModuleDefV9Builder::new();
1753+
let name = RawIdentifier::new(name);
1754+
let type_ref = builder.add_algebraic_type(
1755+
[],
1756+
name.clone(),
1757+
AlgebraicType::Product(ProductType::from_iter([("x", AlgebraicType::U8)])),
1758+
true,
1759+
);
1760+
1761+
builder.add_view(
1762+
name.clone(),
1763+
0,
1764+
true,
1765+
is_anonymous,
1766+
ProductType::unit(),
1767+
AlgebraicType::array(AlgebraicType::Ref(type_ref)),
1768+
);
1769+
1770+
builder.finish().try_into().expect("test module def should be valid")
1771+
}
1772+
1773+
/// Regression test for evaluating anonymous views.
1774+
///
1775+
/// Anonymous views have one shared materialization,
1776+
/// so we should only re-evaluate once even if there are multiple subscribers.
1777+
#[test]
1778+
fn test_dedup_anonymous_view_calls() -> anyhow::Result<()> {
1779+
let stdb = TestDB::in_memory()?;
1780+
let module_def = module_def_for_view("anonymous_view", true);
1781+
let view_def = module_def.view("anonymous_view").expect("view should exist");
1782+
1783+
let mut tx = begin_mut_tx(&stdb);
1784+
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1785+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1786+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1787+
1788+
// Two subscriber rows exist, but anonymous views should still be reevaluated once
1789+
// because they share a single materialization.
1790+
let calls = collect_subscribed_view_calls(&tx, &module_def, Identity::ZERO)?;
1791+
1792+
assert_eq!(
1793+
calls.len(),
1794+
1,
1795+
"anonymous views should only be reevaluated once even with multiple subscriber rows"
1796+
);
1797+
assert_eq!(calls[0].view_id, view_id);
1798+
assert_eq!(calls[0].sender, None);
1799+
Ok(())
1800+
}
1801+
1802+
/// Regression test for evaluating sender-scoped views.
1803+
///
1804+
/// These views have separate materializations per sender,
1805+
/// so reevaluation must emit one call per subscribed sender.
1806+
#[test]
1807+
fn test_distinct_sender_scoped_view_calls() -> anyhow::Result<()> {
1808+
let stdb = TestDB::in_memory()?;
1809+
let module_def = module_def_for_view("sender_view", false);
1810+
let view_def = module_def.view("sender_view").expect("view should exist");
1811+
1812+
let mut tx = begin_mut_tx(&stdb);
1813+
let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?;
1814+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?;
1815+
tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?;
1816+
1817+
// Sender-backed views keep one materialization per sender, so reevaluation must
1818+
// preserve both callers.
1819+
let calls = collect_subscribed_view_calls(&tx, &module_def, Identity::ZERO)?;
1820+
let senders: Vec<_> = calls.iter().filter_map(|call| call.sender).collect();
1821+
1822+
assert_eq!(calls.len(), 2, "sender views should still reevaluate once per sender");
1823+
assert!(senders.contains(&Identity::ZERO));
1824+
assert!(senders.contains(&Identity::ONE));
1825+
Ok(())
1826+
}
1827+
}

0 commit comments

Comments
 (0)