From 047a56ccddff10f23d79be61c225b06f6a473192 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Mon, 24 Nov 2025 12:28:45 -0800 Subject: [PATCH 1/3] Drop private columns on incremental update for view subscriptions --- crates/core/src/subscription/delta.rs | 24 +++++++++++++---- crates/execution/src/lib.rs | 10 ++++++- crates/subscription/src/lib.rs | 38 +++++++++++++++++++++++++++ crates/vm/src/relation.rs | 3 ++- 4 files changed, 68 insertions(+), 7 deletions(-) diff --git a/crates/core/src/subscription/delta.rs b/crates/core/src/subscription/delta.rs index 2fc484c4c65..744e4986f11 100644 --- a/crates/core/src/subscription/delta.rs +++ b/crates/core/src/subscription/delta.rs @@ -1,7 +1,9 @@ use anyhow::Result; use hashbrown::HashMap; -use spacetimedb_execution::{Datastore, DeltaStore}; +use spacetimedb_execution::{Datastore, DeltaStore, Row}; use spacetimedb_lib::metrics::ExecutionMetrics; +use spacetimedb_primitives::ColList; +use spacetimedb_sats::product_value::InvalidFieldError; use spacetimedb_subscription::SubscriptionPlan; use spacetimedb_vm::relation::RelValue; @@ -30,16 +32,26 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( let mut duplicate_rows_evaluated = 0; let mut duplicate_rows_sent = 0; + let col_list = ColList::from_iter(plan.num_private_cols()..plan.num_cols()); + + let maybe_project = |row: Row<'a>| -> Result, InvalidFieldError> { + if plan.is_view() { + Ok(row.project_product(&col_list)?.into()) + } else { + Ok(row.into()) + } + }; + if !plan.is_join() { // Single table plans will never return redundant rows, // so there's no need to track row counts. plan.for_each_insert(tx, metrics, &mut |row| { - inserts.push(row.into()); + inserts.push(maybe_project(row)?); Ok(()) })?; plan.for_each_delete(tx, metrics, &mut |row| { - deletes.push(row.into()); + deletes.push(maybe_project(row)?); Ok(()) })?; } else { @@ -49,6 +61,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( let mut delete_counts = HashMap::new(); plan.for_each_insert(tx, metrics, &mut |row| { + let row = maybe_project(row)?; let n = insert_counts.entry(row).or_default(); if *n > 0 { duplicate_rows_evaluated += 1; @@ -58,6 +71,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( })?; plan.for_each_delete(tx, metrics, &mut |row| { + let row = maybe_project(row)?; match insert_counts.get_mut(&row) { // We have not seen an insert for this row. // If we have seen a delete, increment the metric. @@ -93,11 +107,11 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>( for (row, n) in insert_counts.into_iter().filter(|(_, n)| *n > 0) { duplicate_rows_sent += n as u64 - 1; - inserts.extend(std::iter::repeat_n(row, n).map(RelValue::from)); + inserts.extend(std::iter::repeat_n(row, n)); } for (row, n) in delete_counts.into_iter().filter(|(_, n)| *n > 0) { duplicate_rows_sent += n as u64 - 1; - deletes.extend(std::iter::repeat_n(row, n).map(RelValue::from)); + deletes.extend(std::iter::repeat_n(row, n)); } } diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index b20563f3c4c..79d782272b9 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -11,7 +11,8 @@ use spacetimedb_lib::{ AlgebraicValue, ProductValue, }; use spacetimedb_physical_plan::plan::{ProjectField, TupleField}; -use spacetimedb_primitives::{IndexId, TableId}; +use spacetimedb_primitives::{ColList, IndexId, TableId}; +use spacetimedb_sats::product_value::InvalidFieldError; use spacetimedb_table::{static_assert_size, table::RowRef}; pub mod dml; @@ -121,6 +122,13 @@ impl Row<'_> { Self::Ref(val) => (*val).clone(), } } + + pub fn project_product(self, cols: &ColList) -> Result { + match self { + Self::Ptr(ptr) => ptr.project_product(cols), + Self::Ref(val) => val.project_product(cols), + } + } } impl_serialize!(['a] Row<'a>, (self, ser) => match self { diff --git a/crates/subscription/src/lib.rs b/crates/subscription/src/lib.rs index 3405c9cf10a..a626fd8ee09 100644 --- a/crates/subscription/src/lib.rs +++ b/crates/subscription/src/lib.rs @@ -354,6 +354,14 @@ pub struct SubscriptionPlan { return_id: TableId, /// To which table are we subscribed? return_name: TableName, + /// Are we subscribed to a view? + is_view: bool, + /// The number of columns returned. + /// Only relevant for views. + num_cols: usize, + /// The number of private columns returned. + /// Only relevant for views. + num_private_cols: usize, /// A subscription can read from multiple tables. /// From which tables do we read? table_ids: Vec, @@ -375,6 +383,23 @@ impl SubscriptionPlan { self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1 } + /// Does this plan return rows from a view? + pub fn is_view(&self) -> bool { + self.is_view + } + + /// The number of columns returned. + /// Only relevant if [`Self::is_view`] is true. + pub fn num_cols(&self) -> usize { + self.num_cols + } + + /// The number of private columns returned. + /// Only relevant if [`Self::is_view`] is true. + pub fn num_private_cols(&self) -> usize { + self.num_private_cols + } + /// To which table does this plan subscribe? pub fn subscribed_table_id(&self) -> TableId { self.return_id @@ -533,6 +558,16 @@ impl SubscriptionPlan { bail!("Subscriptions require indexes on join columns") } + let is_view = plan_opt.returns_view_table(); + let num_cols = plan_opt + .return_table() + .map(|schema| schema.num_cols()) + .unwrap_or_default(); + let num_private_cols = plan_opt + .return_table() + .map(|schema| schema.num_private_cols()) + .unwrap_or_default(); + let (table_ids, table_aliases) = table_ids_for_plan(&plan); let fragments = Fragments::compile_from_plan(&plan, &table_aliases, auth)?; @@ -540,6 +575,9 @@ impl SubscriptionPlan { subscriptions.push(Self { return_id, return_name: return_name.clone(), + is_view, + num_cols, + num_private_cols, table_ids, plan_opt, fragments, diff --git a/crates/vm/src/relation.rs b/crates/vm/src/relation.rs index 1c96413e986..2d625369f87 100644 --- a/crates/vm/src/relation.rs +++ b/crates/vm/src/relation.rs @@ -1,4 +1,5 @@ use core::hash::{Hash, Hasher}; +use derive_more::From; use spacetimedb_execution::Row; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_sats::bsatn::{ser::BsatnError, ToBsatn}; @@ -15,7 +16,7 @@ use std::sync::Arc; /// or an ephemeral row constructed during query execution. /// /// A `RelValue` is the type generated/consumed by queries. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, From)] pub enum RelValue<'a> { /// A reference to a row in a table. Row(RowRef<'a>), From adf0ad9d285fcc185157ad9d8f2a1fa8959f1755 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Mon, 24 Nov 2025 12:44:42 -0800 Subject: [PATCH 2/3] update test --- smoketests/tests/views.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/smoketests/tests/views.py b/smoketests/tests/views.py index 2b5a9de2e34..483f8bab4b3 100644 --- a/smoketests/tests/views.py +++ b/smoketests/tests/views.py @@ -551,19 +551,19 @@ class SubscribeViews(Smoketest): } """ - def _test_subscribing_with_different_identities(self): + def test_subscribing_with_different_identities(self): """Tests different clients subscribing to a client-specific view""" # Insert an identity for Alice self.call("insert_player", "Alice") - # Generate and insert a new identity for Bob + # Generate a new identity for Bob self.reset_config() self.new_identity() - self.call("insert_player", "Bob") # Subscribe to `my_player` as Bob - sub = self.subscribe("select * from my_player", n=0) + sub = self.subscribe("select * from my_player", n=1) + self.call("insert_player", "Bob") events = sub() # Project out the identity field. From a84be71d0ec880ac660b96ddf9fd872bbff74868 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Mon, 24 Nov 2025 13:28:56 -0800 Subject: [PATCH 3/3] no new fields --- crates/subscription/src/lib.rs | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/crates/subscription/src/lib.rs b/crates/subscription/src/lib.rs index a626fd8ee09..d0f4668459b 100644 --- a/crates/subscription/src/lib.rs +++ b/crates/subscription/src/lib.rs @@ -354,14 +354,6 @@ pub struct SubscriptionPlan { return_id: TableId, /// To which table are we subscribed? return_name: TableName, - /// Are we subscribed to a view? - is_view: bool, - /// The number of columns returned. - /// Only relevant for views. - num_cols: usize, - /// The number of private columns returned. - /// Only relevant for views. - num_private_cols: usize, /// A subscription can read from multiple tables. /// From which tables do we read? table_ids: Vec, @@ -385,19 +377,25 @@ impl SubscriptionPlan { /// Does this plan return rows from a view? pub fn is_view(&self) -> bool { - self.is_view + self.plan_opt.returns_view_table() } /// The number of columns returned. /// Only relevant if [`Self::is_view`] is true. pub fn num_cols(&self) -> usize { - self.num_cols + self.plan_opt + .return_table() + .map(|schema| schema.num_cols()) + .unwrap_or_default() } /// The number of private columns returned. /// Only relevant if [`Self::is_view`] is true. pub fn num_private_cols(&self) -> usize { - self.num_private_cols + self.plan_opt + .return_table() + .map(|schema| schema.num_private_cols()) + .unwrap_or_default() } /// To which table does this plan subscribe? @@ -558,16 +556,6 @@ impl SubscriptionPlan { bail!("Subscriptions require indexes on join columns") } - let is_view = plan_opt.returns_view_table(); - let num_cols = plan_opt - .return_table() - .map(|schema| schema.num_cols()) - .unwrap_or_default(); - let num_private_cols = plan_opt - .return_table() - .map(|schema| schema.num_private_cols()) - .unwrap_or_default(); - let (table_ids, table_aliases) = table_ids_for_plan(&plan); let fragments = Fragments::compile_from_plan(&plan, &table_aliases, auth)?; @@ -575,9 +563,6 @@ impl SubscriptionPlan { subscriptions.push(Self { return_id, return_name: return_name.clone(), - is_view, - num_cols, - num_private_cols, table_ids, plan_opt, fragments,