Skip to content

Commit 9608f49

Browse files
Speed up metadata update on unsubscribe (#2880)
1 parent 8d25009 commit 9608f49

1 file changed

Lines changed: 47 additions & 46 deletions

File tree

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 47 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use crate::subscription::delta::eval_delta;
1313
use crate::worker_metrics::WORKER_METRICS;
1414
use hashbrown::hash_map::OccupiedError;
1515
use hashbrown::{HashMap, HashSet};
16-
use itertools::Itertools;
1716
use parking_lot::RwLock;
1817
use prometheus::IntGauge;
1918
use spacetimedb_client_api_messages::websocket::{
@@ -25,7 +24,7 @@ use spacetimedb_lib::metrics::ExecutionMetrics;
2524
use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue};
2625
use spacetimedb_primitives::{ColId, IndexId, TableId};
2726
use spacetimedb_subscription::{JoinEdge, SubscriptionPlan, TableName};
28-
use std::collections::{BTreeMap, BTreeSet};
27+
use std::collections::BTreeMap;
2928
use std::sync::atomic::{AtomicBool, Ordering};
3029
use std::sync::Arc;
3130
use tokio::sync::mpsc;
@@ -92,6 +91,19 @@ impl Plan {
9291
.into_iter()
9392
}
9493

94+
/// Return the search arguments for this query
95+
fn search_args(&self) -> impl Iterator<Item = (TableId, ColId, AlgebraicValue)> {
96+
let mut args = HashSet::new();
97+
for arg in self
98+
.plans
99+
.iter()
100+
.flat_map(|subscription| subscription.optimized_physical_plan().search_args())
101+
{
102+
args.insert(arg);
103+
}
104+
args.into_iter()
105+
}
106+
95107
/// Returns the plan fragments that comprise this subscription.
96108
/// Will only return one element unless there is a table with multiple RLS rules.
97109
pub fn plans_fragments(&self) -> impl Iterator<Item = &SubscriptionPlan> + '_ {
@@ -194,16 +206,7 @@ impl QueryState {
194206

195207
/// Return the search arguments for this query
196208
fn search_args(&self) -> impl Iterator<Item = (TableId, ColId, AlgebraicValue)> {
197-
let mut args = HashSet::new();
198-
for arg in self
199-
.query
200-
.plans
201-
.iter()
202-
.flat_map(|subscription| subscription.optimized_physical_plan().search_args())
203-
{
204-
args.insert(arg);
205-
}
206-
args.into_iter()
209+
self.query.search_args()
207210
}
208211
}
209212

@@ -237,7 +240,7 @@ pub struct SearchArguments {
237240
/// ```
238241
///
239242
/// This query is parameterized by `s.x`.
240-
params: BTreeSet<(TableId, ColId)>,
243+
cols: HashMap<TableId, HashSet<ColId>>,
241244
/// For each parameter we keep track of its possible values or arguments.
242245
/// These arguments are the different values that clients subscribe with.
243246
///
@@ -250,18 +253,13 @@ pub struct SearchArguments {
250253
///
251254
/// These queries will get parameterized by `t.id`,
252255
/// and we will record the args `3` and `5` in this map.
253-
args: BTreeSet<(TableId, ColId, AlgebraicValue, QueryHash)>,
256+
args: BTreeMap<(TableId, ColId, AlgebraicValue), HashSet<QueryHash>>,
254257
}
255258

256259
impl SearchArguments {
257260
/// Return the column ids by which a table is parameterized
258-
fn search_params_for_table(&self, table_id: TableId) -> impl Iterator<Item = ColId> + '_ {
259-
let lower_bound = (table_id, 0.into());
260-
let upper_bound = (table_id, u16::MAX.into());
261-
self.params
262-
.range(lower_bound..=upper_bound)
263-
.map(|(_, col_id)| col_id)
264-
.cloned()
261+
fn search_params_for_table(&self, table_id: TableId) -> impl Iterator<Item = &ColId> + '_ {
262+
self.cols.get(&table_id).into_iter().flatten()
265263
}
266264

267265
/// Are there queries parameterized by this table and column?
@@ -272,52 +270,55 @@ impl SearchArguments {
272270
col_id: ColId,
273271
search_arg: AlgebraicValue,
274272
) -> impl Iterator<Item = &QueryHash> {
275-
let lower_bound = (table_id, col_id, search_arg.clone(), QueryHash::MIN);
276-
let upper_bound = (table_id, col_id, search_arg, QueryHash::MAX);
277-
self.args.range(lower_bound..upper_bound).map(|(_, _, _, hash)| hash)
273+
self.args.get(&(table_id, col_id, search_arg)).into_iter().flatten()
278274
}
279275

280276
/// Find the queries that need to be evaluated for this row.
281277
fn queries_for_row<'a>(&'a self, table_id: TableId, row: &'a ProductValue) -> impl Iterator<Item = &'a QueryHash> {
282278
self.search_params_for_table(table_id)
283279
.filter_map(|col_id| row.get_field(col_id.idx(), None).ok().map(|arg| (col_id, arg.clone())))
284-
.flat_map(move |(col_id, arg)| self.queries_for_search_arg(table_id, col_id, arg))
280+
.flat_map(move |(col_id, arg)| self.queries_for_search_arg(table_id, *col_id, arg))
285281
}
286282

287283
/// Remove a query hash and its associated data from this container.
288284
/// Note, a query hash may be associated with multiple column ids.
289-
fn remove_query(&mut self, query: &QueryHash) {
285+
fn remove_query(&mut self, query: &Query) {
290286
// Collect the column parameters for this query
291-
let mut params = self
292-
.args
293-
.iter()
294-
.filter(|(_, _, _, hash)| hash == query)
295-
.map(|(table_id, col_id, _, _)| (*table_id, *col_id))
296-
.dedup()
297-
.collect::<HashSet<_>>();
287+
let mut params = query.search_args().collect::<Vec<_>>();
298288

299289
// Remove the search argument entries for this query
300-
self.args.retain(|(_, _, _, hash)| hash != query);
290+
for key in &params {
291+
if let Some(hashes) = self.args.get_mut(key) {
292+
hashes.remove(&query.hash);
293+
if hashes.is_empty() {
294+
self.args.remove(key);
295+
}
296+
}
297+
}
301298

302-
// Remove column parameters that no longer have any search arguments associated to them
303-
params.retain(|(table_id, col_id)| {
299+
// Retain columns that no longer map to any search arguments
300+
params.retain(|(table_id, col_id, _)| {
304301
self.args
305-
.range(
306-
(*table_id, *col_id, AlgebraicValue::Min, QueryHash::MIN)
307-
..=(*table_id, *col_id, AlgebraicValue::Max, QueryHash::MAX),
308-
)
302+
.range((*table_id, *col_id, AlgebraicValue::Min)..=(*table_id, *col_id, AlgebraicValue::Max))
309303
.next()
310304
.is_none()
311305
});
312306

313-
self.params
314-
.retain(|(table_id, col_id)| !params.contains(&(*table_id, *col_id)));
307+
// Remove columns that no longer map to any search arguments
308+
for (table_id, col_id, _) in params {
309+
if let Some(col_ids) = self.cols.get_mut(&table_id) {
310+
col_ids.remove(&col_id);
311+
if col_ids.is_empty() {
312+
self.cols.remove(&table_id);
313+
}
314+
}
315+
}
315316
}
316317

317318
/// Add a new mapping from search argument to query hash
318319
fn insert_query(&mut self, table_id: TableId, col_id: ColId, arg: AlgebraicValue, query: QueryHash) {
319-
self.args.insert((table_id, col_id, arg, query));
320-
self.params.insert((table_id, col_id));
320+
self.args.entry((table_id, col_id, arg)).or_default().insert(query);
321+
self.cols.entry(table_id).or_default().insert(col_id);
321322
}
322323
}
323324

@@ -693,7 +694,7 @@ impl SubscriptionManager {
693694
fn table_has_search_param(&self, table_id: TableId, col_id: ColId) -> bool {
694695
self.search_args
695696
.search_params_for_table(table_id)
696-
.any(|id| id == col_id)
697+
.any(|id| *id == col_id)
697698
}
698699

699700
fn remove_legacy_subscriptions(&mut self, client: &ClientId) {
@@ -927,7 +928,7 @@ impl SubscriptionManager {
927928
) {
928929
let hash = query.hash();
929930
join_edges.remove_query(query);
930-
search_args.remove_query(&hash);
931+
search_args.remove_query(query);
931932
index_ids.delete_index_ids_for_query(query);
932933
for table_id in query.table_ids() {
933934
if let Entry::Occupied(mut entry) = tables.entry(table_id) {

0 commit comments

Comments
 (0)