Skip to content

Commit 8a16a12

Browse files
Build indexes over TxData for subscription eval (#2768)
1 parent 1e50c7d commit 8a16a12

12 files changed

Lines changed: 860 additions & 103 deletions

File tree

crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use core::ops::RangeBounds;
2525
use core::{cell::RefCell, mem};
2626
use core::{iter, ops::Bound};
2727
use smallvec::SmallVec;
28-
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore};
28+
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
2929
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics};
3030
use spacetimedb_lib::{
3131
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
@@ -103,6 +103,30 @@ impl DeltaStore for MutTxId {
103103
fn deletes_for_table(&self, _: TableId) -> Option<std::slice::Iter<'_, ProductValue>> {
104104
None
105105
}
106+
107+
/// Subscriptions are currently evaluated using read-only transcations.
108+
/// Hence this will never be called on a mutable transaction.
109+
fn index_scan_range_for_delta(
110+
&self,
111+
_: TableId,
112+
_: IndexId,
113+
_: spacetimedb_lib::query::Delta,
114+
_: impl RangeBounds<AlgebraicValue>,
115+
) -> impl Iterator<Item = Row> {
116+
std::iter::empty()
117+
}
118+
119+
/// Subscriptions are currently evaluated using read-only transcations.
120+
/// Hence this will never be called on a mutable transaction.
121+
fn index_scan_point_for_delta(
122+
&self,
123+
_: TableId,
124+
_: IndexId,
125+
_: spacetimedb_lib::query::Delta,
126+
_: &AlgebraicValue,
127+
) -> impl Iterator<Item = Row> {
128+
std::iter::empty()
129+
}
106130
}
107131

108132
impl MutDatastore for MutTxId {

crates/core/src/db/datastore/traits.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ impl TxData {
218218
self.inserts.iter()
219219
}
220220

221+
/// Get the `i`th inserted row for `table_id` if it exists
222+
pub fn get_ith_insert(&self, table_id: TableId, i: usize) -> Option<&ProductValue> {
223+
self.inserts.get(&table_id).and_then(|rows| rows.get(i))
224+
}
225+
221226
/// Obtain an iterator over the inserted rows per table.
222227
///
223228
/// If you don't need access to the table name, [`Self::inserts`] is
@@ -237,6 +242,11 @@ impl TxData {
237242
self.deletes.iter()
238243
}
239244

245+
/// Get the `i`th deleted row for `table_id` if it exists
246+
pub fn get_ith_delete(&self, table_id: TableId, i: usize) -> Option<&ProductValue> {
247+
self.deletes.get(&table_id).and_then(|rows| rows.get(i))
248+
}
249+
240250
/// Obtain an iterator over the inserted rows per table.
241251
///
242252
/// If you don't need access to the table name, [`Self::deletes`] is

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ impl ModuleSubscriptions {
699699
// Create the delta transaction we'll use to eval updates against.
700700
let delta_read_tx = tx_data
701701
.as_ref()
702-
.map(|tx_data| DeltaTx::new(&read_tx, tx_data))
702+
.map(|tx_data| DeltaTx::new(&read_tx, tx_data, subscriptions.index_ids_for_subscriptions()))
703703
.unwrap_or_else(|| DeltaTx::from(&*read_tx));
704704

705705
let event = Arc::new(event);

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 114 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use spacetimedb_client_api_messages::websocket::{
2323
use spacetimedb_data_structures::map::{Entry, IntMap};
2424
use spacetimedb_lib::metrics::ExecutionMetrics;
2525
use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue};
26-
use spacetimedb_primitives::{ColId, TableId};
26+
use spacetimedb_primitives::{ColId, IndexId, TableId};
2727
use spacetimedb_subscription::{SubscriptionPlan, TableName};
2828
use std::collections::BTreeSet;
2929
use std::sync::atomic::{AtomicBool, Ordering};
@@ -78,6 +78,15 @@ impl Plan {
7878
self.plans[0].subscribed_table_name()
7979
}
8080

81+
/// Returns the index ids from which this subscription reads
82+
pub fn index_ids(&self) -> impl Iterator<Item = (TableId, IndexId)> {
83+
self.plans
84+
.iter()
85+
.flat_map(|plan| plan.index_ids())
86+
.collect::<HashSet<_>>()
87+
.into_iter()
88+
}
89+
8190
/// Returns the table ids from which this subscription reads
8291
pub fn table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
8392
self.plans
@@ -174,6 +183,11 @@ impl QueryState {
174183
itertools::chain(&self.legacy_subscribers, &self.subscriptions)
175184
}
176185

186+
/// Return the [`Query`] for this [`QueryState`]
187+
pub fn query(&self) -> &Query {
188+
&self.query
189+
}
190+
177191
/// Return the search arguments for this query
178192
fn search_args(&self) -> impl Iterator<Item = (TableId, ColId, AlgebraicValue)> {
179193
let mut args = HashSet::new();
@@ -305,6 +319,79 @@ impl SearchArguments {
305319

306320
type ClientsMap = HashMap<ClientId, ClientInfo>;
307321

322+
/// Keeps track of the indexes that are used in subscriptions.
323+
#[derive(Debug, Default)]
324+
pub struct QueriedTableIndexIds {
325+
ids: HashMap<TableId, HashMap<IndexId, usize>>,
326+
}
327+
328+
impl FromIterator<(TableId, IndexId)> for QueriedTableIndexIds {
329+
fn from_iter<T: IntoIterator<Item = (TableId, IndexId)>>(iter: T) -> Self {
330+
let mut index_ids = Self::default();
331+
for (table_id, index_id) in iter {
332+
index_ids.insert_index_id(table_id, index_id);
333+
}
334+
index_ids
335+
}
336+
}
337+
338+
impl QueriedTableIndexIds {
339+
/// Returns the index ids that are used in subscriptions for this table.
340+
/// Note, it does not return all of the index ids that are defined on this table.
341+
/// Only those that are used by at least one subscription query.
342+
pub fn index_ids_for_table(&self, table_id: TableId) -> impl Iterator<Item = IndexId> + '_ {
343+
self.ids
344+
.get(&table_id)
345+
.into_iter()
346+
.flat_map(|index_ids| index_ids.keys())
347+
.copied()
348+
}
349+
350+
/// Insert a new `table_id` `index_id` pair into this container.
351+
/// Note, different queries may read from the same index.
352+
/// Hence we may already be tracking this index, in which case we bump its ref count.
353+
pub fn insert_index_id(&mut self, table_id: TableId, index_id: IndexId) {
354+
*self.ids.entry(table_id).or_default().entry(index_id).or_default() += 1;
355+
}
356+
357+
/// Remove a `table_id` `index_id` pair from this container.
358+
/// Note, different queries may read from the same index.
359+
/// Hence we only remove this key from the map if its ref count goes to zero.
360+
pub fn delete_index_id(&mut self, table_id: TableId, index_id: IndexId) {
361+
if let Some(ids) = self.ids.get_mut(&table_id) {
362+
if let Some(n) = ids.get_mut(&index_id) {
363+
*n -= 1;
364+
365+
if *n == 0 {
366+
ids.remove(&index_id);
367+
368+
if ids.is_empty() {
369+
self.ids.remove(&table_id);
370+
}
371+
}
372+
}
373+
}
374+
}
375+
376+
/// Insert the index ids from which a query reads into this mapping.
377+
/// Note, an index may already be tracked if another query is already using it.
378+
/// In this case we just bump its ref count.
379+
pub fn insert_index_ids_for_query(&mut self, query: &Query) {
380+
for (table_id, index_id) in query.index_ids() {
381+
self.insert_index_id(table_id, index_id);
382+
}
383+
}
384+
385+
/// Delete the index ids from which a query reads from this mapping
386+
/// Note, we will not remove an index id from this mapping if another query is using it.
387+
/// Instead we decrement its ref count.
388+
pub fn delete_index_ids_for_query(&mut self, query: &Query) {
389+
for (table_id, index_id) in query.index_ids() {
390+
self.delete_index_id(table_id, index_id);
391+
}
392+
}
393+
}
394+
308395
/// Responsible for the efficient evaluation of subscriptions.
309396
/// It performs basic multi-query optimization,
310397
/// in that if a query has N subscribers,
@@ -318,17 +405,21 @@ pub struct SubscriptionManager {
318405
/// in order to dispatch messages to clients.
319406
clients: Arc<RwLock<ClientsMap>>,
320407

321-
// Queries for which there is at least one subscriber.
408+
/// Queries for which there is at least one subscriber.
322409
queries: HashMap<QueryHash, QueryState>,
323410

324-
// If a query reads from a table,
325-
// but does not have a simple equality filter on that table,
326-
// we map the table to the query in this inverted index.
411+
/// If a query reads from a table,
412+
/// but does not have a simple equality filter on that table,
413+
/// we map the table to the query in this inverted index.
327414
tables: IntMap<TableId, HashSet<QueryHash>>,
328415

329-
// If a query reads from a table,
330-
// and has a simple equality filter on that table,
331-
// we map the filter values to the query in this lookup table.
416+
/// Tracks the indices used across all subscriptions
417+
/// to enable building the appropriate indexes for row updates.
418+
indexes: QueriedTableIndexIds,
419+
420+
/// If a query reads from a table,
421+
/// and has a simple equality filter on that table,
422+
/// we map the filter values to the query in this lookup table.
332423
search_args: SearchArguments,
333424

334425
/// Transmit side of a channel to the manager's [`Self::send_worker`] task.
@@ -418,6 +509,7 @@ impl SubscriptionManager {
418509
Self {
419510
clients: Default::default(),
420511
queries: Default::default(),
512+
indexes: Default::default(),
421513
tables: Default::default(),
422514
search_args: Default::default(),
423515
send_worker_tx,
@@ -506,6 +598,7 @@ impl SubscriptionManager {
506598
if !query_state.has_subscribers() {
507599
SubscriptionManager::remove_query_from_tables(
508600
&mut self.tables,
601+
&mut self.indexes,
509602
&mut self.search_args,
510603
&query_state.query,
511604
);
@@ -572,6 +665,7 @@ impl SubscriptionManager {
572665
if !query_state.has_subscribers() {
573666
SubscriptionManager::remove_query_from_tables(
574667
&mut self.tables,
668+
&mut self.indexes,
575669
&mut self.search_args,
576670
&query_state.query,
577671
);
@@ -639,7 +733,7 @@ impl SubscriptionManager {
639733
.entry(hash)
640734
.or_insert_with(|| QueryState::new(query.clone()));
641735

642-
Self::insert_query(&mut self.tables, &mut self.search_args, query_state);
736+
Self::insert_query(&mut self.tables, &mut self.indexes, &mut self.search_args, query_state);
643737

644738
let entry = ci.subscription_ref_count.entry(hash).or_insert(0);
645739
*entry += 1;
@@ -687,7 +781,7 @@ impl SubscriptionManager {
687781
.queries
688782
.entry(hash)
689783
.or_insert_with(|| QueryState::new(unit.clone()));
690-
Self::insert_query(&mut self.tables, &mut self.search_args, query_state);
784+
Self::insert_query(&mut self.tables, &mut self.indexes, &mut self.search_args, query_state);
691785
query_state.legacy_subscribers.insert(client_id);
692786
}
693787
}
@@ -697,11 +791,13 @@ impl SubscriptionManager {
697791
// This takes a ref to the table map instead of `self` to avoid borrowing issues.
698792
fn remove_query_from_tables(
699793
tables: &mut IntMap<TableId, HashSet<QueryHash>>,
794+
index_ids: &mut QueriedTableIndexIds,
700795
search_args: &mut SearchArguments,
701796
query: &Query,
702797
) {
703798
let hash = query.hash();
704799
search_args.remove_query(&hash);
800+
index_ids.delete_index_ids_for_query(query);
705801
for table_id in query.table_ids() {
706802
if let Entry::Occupied(mut entry) = tables.entry(table_id) {
707803
let hashes = entry.get_mut();
@@ -717,11 +813,13 @@ impl SubscriptionManager {
717813
// This takes a ref to the table map instead of `self` to avoid borrowing issues.
718814
fn insert_query(
719815
tables: &mut IntMap<TableId, HashSet<QueryHash>>,
816+
index_ids: &mut QueriedTableIndexIds,
720817
search_args: &mut SearchArguments,
721818
query_state: &QueryState,
722819
) {
723820
// If this is new, we need to update the table to query mapping.
724821
if !query_state.has_subscribers() {
822+
index_ids.insert_index_ids_for_query(query_state.query());
725823
let hash = query_state.query.hash();
726824
let mut table_ids = query_state.query.table_ids().collect::<HashSet<_>>();
727825
for (table_id, col_id, arg) in query_state.search_args() {
@@ -760,6 +858,7 @@ impl SubscriptionManager {
760858
queries_to_remove.push(*query_hash);
761859
SubscriptionManager::remove_query_from_tables(
762860
&mut self.tables,
861+
&mut self.indexes,
763862
&mut self.search_args,
764863
&query_state.query,
765864
);
@@ -825,6 +924,11 @@ impl SubscriptionManager {
825924
queries.into_iter()
826925
}
827926

927+
/// Returns the index ids that are used in subscription queries
928+
pub fn index_ids_for_subscriptions(&self) -> &QueriedTableIndexIds {
929+
&self.indexes
930+
}
931+
828932
/// This method takes a set of delta tables,
829933
/// evaluates only the necessary queries for those delta tables,
830934
/// and then sends the results to each client.

0 commit comments

Comments
 (0)