Skip to content

Commit ff0bf84

Browse files
Track non-selection tables for join queries in subscription manager
Fixes a bug where the subscription manager would not update its metadata for tables that did not have a filter in a join query. This resulted in row updates not being sent to clients for those tables.
1 parent 398bbec commit ff0bf84

1 file changed

Lines changed: 13 additions & 12 deletions

File tree

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -309,11 +309,13 @@ pub struct SubscriptionManager {
309309
// Queries for which there is at least one subscriber.
310310
queries: HashMap<QueryHash, QueryState>,
311311

312-
// Inverted index from tables to queries that read from them.
313-
// Note, a query is present in either `tables` or `search_args` but not both.
312+
// If a query reads from a table,
313+
// but does not have a simple equality filter on that table,
314+
// we map the table to the query in this inverted index.
314315
tables: IntMap<TableId, HashSet<QueryHash>>,
315316

316-
// For queries that have simple equality filters,
317+
// If a query reads from a table,
318+
// and has a simple equality filter on that table,
317319
// we map the filter values to the query in this lookup table.
318320
search_args: SearchArguments,
319321
}
@@ -629,15 +631,13 @@ impl SubscriptionManager {
629631
// If this is new, we need to update the table to query mapping.
630632
if !query_state.has_subscribers() {
631633
let hash = query_state.query.hash();
632-
let mut has_search_args = false;
634+
let mut table_ids = query_state.query.table_ids().collect::<HashSet<_>>();
633635
for (table_id, col_id, arg) in query_state.search_args() {
634-
has_search_args = true;
636+
table_ids.remove(&table_id);
635637
search_args.insert_query(table_id, col_id, arg, hash);
636638
}
637-
if !has_search_args {
638-
for table_id in query_state.query.table_ids() {
639-
tables.entry(table_id).or_default().insert(hash);
640-
}
639+
for table_id in table_ids {
640+
tables.entry(table_id).or_default().insert(hash);
641641
}
642642
}
643643
}
@@ -714,9 +714,10 @@ impl SubscriptionManager {
714714
{
715715
queries.insert(hash);
716716
}
717-
queries
718-
.into_iter()
719-
.chain(self.tables.get(&table_update.table_id).into_iter().flatten())
717+
for hash in self.tables.get(&table_update.table_id).into_iter().flatten() {
718+
queries.insert(hash);
719+
}
720+
queries.into_iter()
720721
}
721722

722723
/// This method takes a set of delta tables,

0 commit comments

Comments
 (0)