Skip to content

Commit 5611e40

Browse files
Fix subscription manager metadata for joins (#2705)
1 parent 7dddee9 commit 5611e40

2 files changed

Lines changed: 114 additions & 14 deletions

File tree

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,34 @@ mod tests {
18021802
assert_eq!(metrics.delta_queries_evaluated, 1);
18031803
assert_eq!(metrics.delta_queries_matched, 1);
18041804

1805+
// Modify a matching row in `u`
1806+
let metrics = commit_tx(
1807+
&db,
1808+
&subs,
1809+
[(u_id, product![1u64, 2u64, 2u64])],
1810+
[(u_id, product![1u64, 2u64, 3u64])],
1811+
)?;
1812+
1813+
assert_tx_update_for_table(
1814+
&mut rx_for_b,
1815+
u_id,
1816+
&ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
1817+
[product![1u64, 2u64, 3u64]],
1818+
[product![1u64, 2u64, 2u64]],
1819+
)
1820+
.await;
1821+
1822+
// We should have evaluated all of the queries
1823+
assert_eq!(metrics.delta_queries_evaluated, 4);
1824+
assert_eq!(metrics.delta_queries_matched, 1);
1825+
1826+
// Insert a non-matching row in `u`
1827+
let metrics = commit_tx(&db, &subs, [], [(u_id, product![3u64, 0u64, 0u64])])?;
1828+
1829+
// We should have evaluated all of the queries
1830+
assert_eq!(metrics.delta_queries_evaluated, 4);
1831+
assert_eq!(metrics.delta_queries_matched, 0);
1832+
18051833
Ok(())
18061834
}
18071835

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -309,11 +309,13 @@ pub struct SubscriptionManager {
309309
// Queries for which there is at least one subscriber.
310310
queries: HashMap<QueryHash, QueryState>,
311311

312-
// Inverted index from tables to queries that read from them.
313-
// Note, a query is present in either `tables` or `search_args` but not both.
312+
// If a query reads from a table,
313+
// but does not have a simple equality filter on that table,
314+
// we map the table to the query in this inverted index.
314315
tables: IntMap<TableId, HashSet<QueryHash>>,
315316

316-
// For queries that have simple equality filters,
317+
// If a query reads from a table,
318+
// and has a simple equality filter on that table,
317319
// we map the filter values to the query in this lookup table.
318320
search_args: SearchArguments,
319321
}
@@ -619,7 +621,7 @@ impl SubscriptionManager {
619621
}
620622

621623
// Update the mapping from table id to related queries by inserting the given query.
622-
// If this query has search arguments, that mapping is updated instead.
624+
// Also add any search arguments the query may have.
623625
// This takes a ref to the table map instead of `self` to avoid borrowing issues.
624626
fn insert_query(
625627
tables: &mut IntMap<TableId, HashSet<QueryHash>>,
@@ -629,15 +631,15 @@ impl SubscriptionManager {
629631
// If this is new, we need to update the table to query mapping.
630632
if !query_state.has_subscribers() {
631633
let hash = query_state.query.hash();
632-
let mut has_search_args = false;
634+
let mut table_ids = query_state.query.table_ids().collect::<HashSet<_>>();
633635
for (table_id, col_id, arg) in query_state.search_args() {
634-
has_search_args = true;
636+
table_ids.remove(&table_id);
635637
search_args.insert_query(table_id, col_id, arg, hash);
636638
}
637-
if !has_search_args {
638-
for table_id in query_state.query.table_ids() {
639-
tables.entry(table_id).or_default().insert(hash);
640-
}
639+
// Update the `tables` map if this query reads from a table,
640+
// but does not have a search argument for that table.
641+
for table_id in table_ids {
642+
tables.entry(table_id).or_default().insert(hash);
641643
}
642644
}
643645
}
@@ -714,9 +716,10 @@ impl SubscriptionManager {
714716
{
715717
queries.insert(hash);
716718
}
717-
queries
718-
.into_iter()
719-
.chain(self.tables.get(&table_update.table_id).into_iter().flatten())
719+
for hash in self.tables.get(&table_update.table_id).into_iter().flatten() {
720+
queries.insert(hash);
721+
}
722+
queries.into_iter()
720723
}
721724

722725
/// This method takes a set of delta tables,
@@ -1519,7 +1522,7 @@ mod tests {
15191522
}
15201523

15211524
#[test]
1522-
fn test_lookup_queries_for_search_arg() -> ResultTest<()> {
1525+
fn test_search_args_for_selects() -> ResultTest<()> {
15231526
let db = TestDB::durable()?;
15241527

15251528
let table_id = create_table(&db, "t")?;
@@ -1581,6 +1584,75 @@ mod tests {
15811584
Ok(())
15821585
}
15831586

1587+
#[test]
1588+
fn test_search_args_for_join() -> ResultTest<()> {
1589+
let db = TestDB::durable()?;
1590+
1591+
let schema = [("id", AlgebraicType::U8), ("a", AlgebraicType::U8)];
1592+
1593+
let t_id = db.create_table_for_test("t", &schema, &[0.into()])?;
1594+
let s_id = db.create_table_for_test("s", &schema, &[0.into()])?;
1595+
1596+
let client = Arc::new(client(0));
1597+
let mut subscriptions = SubscriptionManager::default();
1598+
1599+
let plan = compile_plan(&db, "select t.* from t join s on t.id = s.id where s.a = 1")?;
1600+
let hash = plan.hash;
1601+
1602+
subscriptions.add_subscription_multi(client.clone(), vec![plan], QueryId::new(0))?;
1603+
1604+
// Do we need to evaluate the above join query for this table update?
1605+
// Yes, because the above query does not filter on `t`.
1606+
// Therefore we must evaluate it for any update on `t`.
1607+
let table_update = DatabaseTableUpdate {
1608+
table_id: t_id,
1609+
table_name: "t".into(),
1610+
inserts: [product![0u8, 0u8]].into(),
1611+
deletes: [].into(),
1612+
};
1613+
1614+
let hashes = subscriptions
1615+
.queries_for_table_update(&table_update)
1616+
.cloned()
1617+
.collect::<Vec<_>>();
1618+
1619+
assert_eq!(hashes, vec![hash]);
1620+
1621+
// Do we need to evaluate the above join query for this table update?
1622+
// Yes, because `s.a = 1`.
1623+
let table_update = DatabaseTableUpdate {
1624+
table_id: s_id,
1625+
table_name: "s".into(),
1626+
inserts: [product![0u8, 1u8]].into(),
1627+
deletes: [].into(),
1628+
};
1629+
1630+
let hashes = subscriptions
1631+
.queries_for_table_update(&table_update)
1632+
.cloned()
1633+
.collect::<Vec<_>>();
1634+
1635+
assert_eq!(hashes, vec![hash]);
1636+
1637+
// Do we need to evaluate the above join query for this table update?
1638+
// No, because `s.a != 1`.
1639+
let table_update = DatabaseTableUpdate {
1640+
table_id: s_id,
1641+
table_name: "s".into(),
1642+
inserts: [product![0u8, 2u8]].into(),
1643+
deletes: [].into(),
1644+
};
1645+
1646+
let hashes = subscriptions
1647+
.queries_for_table_update(&table_update)
1648+
.cloned()
1649+
.collect::<Vec<_>>();
1650+
1651+
assert!(hashes.is_empty());
1652+
1653+
Ok(())
1654+
}
1655+
15841656
#[test]
15851657
fn test_subscribe_fails_with_duplicate_request_id() -> ResultTest<()> {
15861658
let db = TestDB::durable()?;

0 commit comments

Comments
 (0)