Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 47 additions & 46 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::subscription::delta::eval_delta;
use crate::worker_metrics::WORKER_METRICS;
use hashbrown::hash_map::OccupiedError;
use hashbrown::{HashMap, HashSet};
use itertools::Itertools;
use parking_lot::RwLock;
use prometheus::IntGauge;
use spacetimedb_client_api_messages::websocket::{
Expand All @@ -25,7 +24,7 @@ use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue};
use spacetimedb_primitives::{ColId, IndexId, TableId};
use spacetimedb_subscription::{JoinEdge, SubscriptionPlan, TableName};
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -92,6 +91,19 @@ impl Plan {
.into_iter()
}

/// Return the search arguments for this query
fn search_args(&self) -> impl Iterator<Item = (TableId, ColId, AlgebraicValue)> {
let mut args = HashSet::new();
for arg in self
.plans
.iter()
.flat_map(|subscription| subscription.optimized_physical_plan().search_args())
{
args.insert(arg);
}
args.into_iter()
}

/// Returns the plan fragments that comprise this subscription.
/// Will only return one element unless there is a table with multiple RLS rules.
pub fn plans_fragments(&self) -> impl Iterator<Item = &SubscriptionPlan> + '_ {
Expand Down Expand Up @@ -194,16 +206,7 @@ impl QueryState {

/// Return the search arguments for this query
fn search_args(&self) -> impl Iterator<Item = (TableId, ColId, AlgebraicValue)> {
let mut args = HashSet::new();
for arg in self
.query
.plans
.iter()
.flat_map(|subscription| subscription.optimized_physical_plan().search_args())
{
args.insert(arg);
}
args.into_iter()
self.query.search_args()
}
}

Expand Down Expand Up @@ -237,7 +240,7 @@ pub struct SearchArguments {
/// ```
///
/// This query is parameterized by `s.x`.
params: BTreeSet<(TableId, ColId)>,
cols: HashMap<TableId, HashSet<ColId>>,
/// For each parameter we keep track of its possible values or arguments.
/// These arguments are the different values that clients subscribe with.
///
Expand All @@ -250,18 +253,13 @@ pub struct SearchArguments {
///
/// These queries will get parameterized by `t.id`,
/// and we will record the args `3` and `5` in this map.
args: BTreeSet<(TableId, ColId, AlgebraicValue, QueryHash)>,
args: BTreeMap<(TableId, ColId, AlgebraicValue), HashSet<QueryHash>>,
}

impl SearchArguments {
/// Return the column ids by which a table is parameterized
fn search_params_for_table(&self, table_id: TableId) -> impl Iterator<Item = ColId> + '_ {
let lower_bound = (table_id, 0.into());
let upper_bound = (table_id, u16::MAX.into());
self.params
.range(lower_bound..=upper_bound)
.map(|(_, col_id)| col_id)
.cloned()
fn search_params_for_table(&self, table_id: TableId) -> impl Iterator<Item = &ColId> + '_ {
self.cols.get(&table_id).into_iter().flatten()
}

/// Are there queries parameterized by this table and column?
Expand All @@ -272,52 +270,55 @@ impl SearchArguments {
col_id: ColId,
search_arg: AlgebraicValue,
) -> impl Iterator<Item = &QueryHash> {
let lower_bound = (table_id, col_id, search_arg.clone(), QueryHash::MIN);
let upper_bound = (table_id, col_id, search_arg, QueryHash::MAX);
self.args.range(lower_bound..upper_bound).map(|(_, _, _, hash)| hash)
self.args.get(&(table_id, col_id, search_arg)).into_iter().flatten()
}

/// Find the queries that need to be evaluated for this row.
fn queries_for_row<'a>(&'a self, table_id: TableId, row: &'a ProductValue) -> impl Iterator<Item = &'a QueryHash> {
self.search_params_for_table(table_id)
.filter_map(|col_id| row.get_field(col_id.idx(), None).ok().map(|arg| (col_id, arg.clone())))
.flat_map(move |(col_id, arg)| self.queries_for_search_arg(table_id, col_id, arg))
.flat_map(move |(col_id, arg)| self.queries_for_search_arg(table_id, *col_id, arg))
}

/// Remove a query hash and its associated data from this container.
/// Note, a query hash may be associated with multiple column ids.
fn remove_query(&mut self, query: &QueryHash) {
fn remove_query(&mut self, query: &Query) {
// Collect the column parameters for this query
let mut params = self
.args
.iter()
.filter(|(_, _, _, hash)| hash == query)
.map(|(table_id, col_id, _, _)| (*table_id, *col_id))
.dedup()
.collect::<HashSet<_>>();
let mut params = query.search_args().collect::<Vec<_>>();

// Remove the search argument entries for this query
self.args.retain(|(_, _, _, hash)| hash != query);
for key in &params {
if let Some(hashes) = self.args.get_mut(key) {
hashes.remove(&query.hash);
if hashes.is_empty() {
self.args.remove(key);
}
}
}

// Remove column parameters that no longer have any search arguments associated to them
params.retain(|(table_id, col_id)| {
// Retain columns that no longer map to any search arguments
params.retain(|(table_id, col_id, _)| {
self.args
.range(
(*table_id, *col_id, AlgebraicValue::Min, QueryHash::MIN)
..=(*table_id, *col_id, AlgebraicValue::Max, QueryHash::MAX),
)
.range((*table_id, *col_id, AlgebraicValue::Min)..=(*table_id, *col_id, AlgebraicValue::Max))
.next()
.is_none()
});

self.params
.retain(|(table_id, col_id)| !params.contains(&(*table_id, *col_id)));
// Remove columns that no longer map to any search arguments
for (table_id, col_id, _) in params {
if let Some(col_ids) = self.cols.get_mut(&table_id) {
col_ids.remove(&col_id);
if col_ids.is_empty() {
self.cols.remove(&table_id);
}
}
}
}

/// Add a new mapping from search argument to query hash
fn insert_query(&mut self, table_id: TableId, col_id: ColId, arg: AlgebraicValue, query: QueryHash) {
self.args.insert((table_id, col_id, arg, query));
self.params.insert((table_id, col_id));
self.args.entry((table_id, col_id, arg)).or_default().insert(query);
self.cols.entry(table_id).or_default().insert(col_id);
}
}

Expand Down Expand Up @@ -693,7 +694,7 @@ impl SubscriptionManager {
fn table_has_search_param(&self, table_id: TableId, col_id: ColId) -> bool {
self.search_args
.search_params_for_table(table_id)
.any(|id| id == col_id)
.any(|id| *id == col_id)
}

fn remove_legacy_subscriptions(&mut self, client: &ClientId) {
Expand Down Expand Up @@ -927,7 +928,7 @@ impl SubscriptionManager {
) {
let hash = query.hash();
join_edges.remove_query(query);
search_args.remove_query(&hash);
search_args.remove_query(query);
index_ids.delete_index_ids_for_query(query);
for table_id in query.table_ids() {
if let Entry::Occupied(mut entry) = tables.entry(table_id) {
Expand Down
Loading