Skip to content

Commit 75d35d3

Browse files
Fix realtime update for views (#3747)
# Description of Changes View tables have private metadata columns that need to be dropped before sending results to clients. Before this patch we dropped these columns for sql queries and initial subscriptions, but we didn't drop them after incremental update which is what this patch does. # API and ABI breaking changes None # Expected complexity level and risk 1 # Testing - [x] Smoketest
1 parent fd524cf commit 75d35d3

5 files changed

Lines changed: 57 additions & 11 deletions

File tree

crates/core/src/subscription/delta.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use anyhow::Result;
22
use hashbrown::HashMap;
3-
use spacetimedb_execution::{Datastore, DeltaStore};
3+
use spacetimedb_execution::{Datastore, DeltaStore, Row};
44
use spacetimedb_lib::metrics::ExecutionMetrics;
5+
use spacetimedb_primitives::ColList;
6+
use spacetimedb_sats::product_value::InvalidFieldError;
57
use spacetimedb_subscription::SubscriptionPlan;
68
use spacetimedb_vm::relation::RelValue;
79

@@ -30,16 +32,26 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
3032
let mut duplicate_rows_evaluated = 0;
3133
let mut duplicate_rows_sent = 0;
3234

35+
let col_list = ColList::from_iter(plan.num_private_cols()..plan.num_cols());
36+
37+
let maybe_project = |row: Row<'a>| -> Result<RelValue<'a>, InvalidFieldError> {
38+
if plan.is_view() {
39+
Ok(row.project_product(&col_list)?.into())
40+
} else {
41+
Ok(row.into())
42+
}
43+
};
44+
3345
if !plan.is_join() {
3446
// Single table plans will never return redundant rows,
3547
// so there's no need to track row counts.
3648
plan.for_each_insert(tx, metrics, &mut |row| {
37-
inserts.push(row.into());
49+
inserts.push(maybe_project(row)?);
3850
Ok(())
3951
})?;
4052

4153
plan.for_each_delete(tx, metrics, &mut |row| {
42-
deletes.push(row.into());
54+
deletes.push(maybe_project(row)?);
4355
Ok(())
4456
})?;
4557
} else {
@@ -49,6 +61,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
4961
let mut delete_counts = HashMap::new();
5062

5163
plan.for_each_insert(tx, metrics, &mut |row| {
64+
let row = maybe_project(row)?;
5265
let n = insert_counts.entry(row).or_default();
5366
if *n > 0 {
5467
duplicate_rows_evaluated += 1;
@@ -58,6 +71,7 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
5871
})?;
5972

6073
plan.for_each_delete(tx, metrics, &mut |row| {
74+
let row = maybe_project(row)?;
6175
match insert_counts.get_mut(&row) {
6276
// We have not seen an insert for this row.
6377
// If we have seen a delete, increment the metric.
@@ -93,11 +107,11 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
93107

94108
for (row, n) in insert_counts.into_iter().filter(|(_, n)| *n > 0) {
95109
duplicate_rows_sent += n as u64 - 1;
96-
inserts.extend(std::iter::repeat_n(row, n).map(RelValue::from));
110+
inserts.extend(std::iter::repeat_n(row, n));
97111
}
98112
for (row, n) in delete_counts.into_iter().filter(|(_, n)| *n > 0) {
99113
duplicate_rows_sent += n as u64 - 1;
100-
deletes.extend(std::iter::repeat_n(row, n).map(RelValue::from));
114+
deletes.extend(std::iter::repeat_n(row, n));
101115
}
102116
}
103117

crates/execution/src/lib.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use spacetimedb_lib::{
1111
AlgebraicValue, ProductValue,
1212
};
1313
use spacetimedb_physical_plan::plan::{ProjectField, TupleField};
14-
use spacetimedb_primitives::{IndexId, TableId};
14+
use spacetimedb_primitives::{ColList, IndexId, TableId};
15+
use spacetimedb_sats::product_value::InvalidFieldError;
1516
use spacetimedb_table::{static_assert_size, table::RowRef};
1617

1718
pub mod dml;
@@ -121,6 +122,13 @@ impl Row<'_> {
121122
Self::Ref(val) => (*val).clone(),
122123
}
123124
}
125+
126+
pub fn project_product(self, cols: &ColList) -> Result<ProductValue, InvalidFieldError> {
127+
match self {
128+
Self::Ptr(ptr) => ptr.project_product(cols),
129+
Self::Ref(val) => val.project_product(cols),
130+
}
131+
}
124132
}
125133

126134
impl_serialize!(['a] Row<'a>, (self, ser) => match self {

crates/subscription/src/lib.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,29 @@ impl SubscriptionPlan {
375375
self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1
376376
}
377377

378+
/// Does this plan return rows from a view?
379+
pub fn is_view(&self) -> bool {
380+
self.plan_opt.returns_view_table()
381+
}
382+
383+
/// The number of columns returned.
384+
/// Only relevant if [`Self::is_view`] is true.
385+
pub fn num_cols(&self) -> usize {
386+
self.plan_opt
387+
.return_table()
388+
.map(|schema| schema.num_cols())
389+
.unwrap_or_default()
390+
}
391+
392+
/// The number of private columns returned.
393+
/// Only relevant if [`Self::is_view`] is true.
394+
pub fn num_private_cols(&self) -> usize {
395+
self.plan_opt
396+
.return_table()
397+
.map(|schema| schema.num_private_cols())
398+
.unwrap_or_default()
399+
}
400+
378401
/// To which table does this plan subscribe?
379402
pub fn subscribed_table_id(&self) -> TableId {
380403
self.return_id

crates/vm/src/relation.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use core::hash::{Hash, Hasher};
2+
use derive_more::From;
23
use spacetimedb_execution::Row;
34
use spacetimedb_lib::db::auth::StAccess;
45
use spacetimedb_sats::bsatn::{ser::BsatnError, ToBsatn};
@@ -15,7 +16,7 @@ use std::sync::Arc;
1516
/// or an ephemeral row constructed during query execution.
1617
///
1718
/// A `RelValue` is the type generated/consumed by queries.
18-
#[derive(Debug, Clone)]
19+
#[derive(Debug, Clone, From)]
1920
pub enum RelValue<'a> {
2021
/// A reference to a row in a table.
2122
Row(RowRef<'a>),

smoketests/tests/views.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -551,19 +551,19 @@ class SubscribeViews(Smoketest):
551551
}
552552
"""
553553

554-
def _test_subscribing_with_different_identities(self):
554+
def test_subscribing_with_different_identities(self):
555555
"""Tests different clients subscribing to a client-specific view"""
556556

557557
# Insert an identity for Alice
558558
self.call("insert_player", "Alice")
559559

560-
# Generate and insert a new identity for Bob
560+
# Generate a new identity for Bob
561561
self.reset_config()
562562
self.new_identity()
563-
self.call("insert_player", "Bob")
564563

565564
# Subscribe to `my_player` as Bob
566-
sub = self.subscribe("select * from my_player", n=0)
565+
sub = self.subscribe("select * from my_player", n=1)
566+
self.call("insert_player", "Bob")
567567
events = sub()
568568

569569
# Project out the identity field.

0 commit comments

Comments
 (0)