From ec21d3bf772c2dcd93f078a4d14b57e698778605 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 8 May 2025 09:29:01 -0700 Subject: [PATCH] Test subscription updates for dml --- .../subscription/module_subscription_actor.rs | 46 +++++++++++++++++++ smoketests/tests/dml.py | 28 +++++++++++ 2 files changed, 74 insertions(+) create mode 100644 smoketests/tests/dml.py diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 142bd51c6d1..a4f0a46d991 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -936,6 +936,7 @@ mod tests { use crate::error::DBError; use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; use crate::messages::websocket as ws; + use crate::sql::execute::run; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager}; use crate::subscription::query::compile_read_only_query; use crate::subscription::TableUpdateType; @@ -1817,6 +1818,51 @@ mod tests { Ok(()) } + /// Test that we receive subscription updates for DML + #[tokio::test] + async fn test_updates_for_dml() -> anyhow::Result<()> { + // Establish a client connection + let (tx, mut rx) = client_connection(client_id_from_u8(1)); + + let db = relational_db()?; + let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); + let schema = [("x", AlgebraicType::U8), ("y", AlgebraicType::U8)]; + let t_id = db.create_table_for_test("t", &schema, &[])?; + + // Subscribe to `t` + subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?; + + // Wait to receive the initial subscription message + assert_matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))); + + let schema = ProductType::from([AlgebraicType::U8, AlgebraicType::U8]); + + // Only the owner can invoke DML commands + let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0)); + + run( + &db, + "INSERT INTO t (x, y) VALUES (0, 1)", + auth, + Some(&subs), + &mut vec![], + )?; + + // Client should receive insert + assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 1_u8]], []).await; + + run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), &mut vec![])?; + + // Client should receive update + assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await; + + run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), &mut vec![])?; + + // Client should receive delete + assert_tx_update_for_table(&mut rx, t_id, &schema, [], [product![0_u8, 2_u8]]).await; + Ok(()) + } + /// Test that we do not compress within a [TransactionUpdateMessage]. /// The message itself is compressed before being sent over the wire, /// but we don't care about that for this test. diff --git a/smoketests/tests/dml.py b/smoketests/tests/dml.py new file mode 100644 index 00000000000..d022e20762b --- /dev/null +++ b/smoketests/tests/dml.py @@ -0,0 +1,28 @@ +from .. import Smoketest + +class Dml(Smoketest): + MODULE_CODE = """ +use spacetimedb::{ReducerContext, Table}; + +#[spacetimedb::table(name = t, public)] +pub struct T { + name: String, +} +""" + + def test_subscribe(self): + """Test that we receive subscription updates from DML""" + + # Subscribe to `t` + sub = self.subscribe("SELECT * FROM t", n=2) + + self.spacetime("sql", self.database_identity, "INSERT INTO t (name) VALUES ('Alice')") + self.spacetime("sql", self.database_identity, "INSERT INTO t (name) VALUES ('Bob')") + + self.assertEqual( + sub(), + [ + {"t": {"deletes": [], "inserts": [{"name": "Alice"}]}}, + {"t": {"deletes": [], "inserts": [{"name": "Bob"}]}}, + ], + )