Skip to content

Commit 398bbec

Browse files
add test for tracking non-selection tables in joins
1 parent 7dddee9 commit 398bbec

2 files changed

Lines changed: 98 additions & 1 deletion

File tree

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,34 @@ mod tests {
18021802
assert_eq!(metrics.delta_queries_evaluated, 1);
18031803
assert_eq!(metrics.delta_queries_matched, 1);
18041804

1805+
// Modify a matching row in `u`
1806+
let metrics = commit_tx(
1807+
&db,
1808+
&subs,
1809+
[(u_id, product![1u64, 2u64, 2u64])],
1810+
[(u_id, product![1u64, 2u64, 3u64])],
1811+
)?;
1812+
1813+
assert_tx_update_for_table(
1814+
&mut rx_for_b,
1815+
u_id,
1816+
&ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
1817+
[product![1u64, 2u64, 3u64]],
1818+
[product![1u64, 2u64, 2u64]],
1819+
)
1820+
.await;
1821+
1822+
// We should have evaluated all of the queries
1823+
assert_eq!(metrics.delta_queries_evaluated, 4);
1824+
assert_eq!(metrics.delta_queries_matched, 1);
1825+
1826+
// Insert a non-matching row in `u`
1827+
let metrics = commit_tx(&db, &subs, [], [(u_id, product![3u64, 0u64, 0u64])])?;
1828+
1829+
// We should have evaluated all of the queries
1830+
assert_eq!(metrics.delta_queries_evaluated, 4);
1831+
assert_eq!(metrics.delta_queries_matched, 0);
1832+
18051833
Ok(())
18061834
}
18071835

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1519,7 +1519,7 @@ mod tests {
15191519
}
15201520

15211521
#[test]
1522-
fn test_lookup_queries_for_search_arg() -> ResultTest<()> {
1522+
fn test_search_args_for_selects() -> ResultTest<()> {
15231523
let db = TestDB::durable()?;
15241524

15251525
let table_id = create_table(&db, "t")?;
@@ -1581,6 +1581,75 @@ mod tests {
15811581
Ok(())
15821582
}
15831583

1584+
#[test]
1585+
fn test_search_args_for_join() -> ResultTest<()> {
1586+
let db = TestDB::durable()?;
1587+
1588+
let schema = [("id", AlgebraicType::U8), ("a", AlgebraicType::U8)];
1589+
1590+
let t_id = db.create_table_for_test("t", &schema, &[0.into()])?;
1591+
let s_id = db.create_table_for_test("s", &schema, &[0.into()])?;
1592+
1593+
let client = Arc::new(client(0));
1594+
let mut subscriptions = SubscriptionManager::default();
1595+
1596+
let plan = compile_plan(&db, "select t.* from t join s on t.id = s.id where s.a = 1")?;
1597+
let hash = plan.hash;
1598+
1599+
subscriptions.add_subscription_multi(client.clone(), vec![plan], QueryId::new(0))?;
1600+
1601+
// Do we need to evaluate the above join query for this table update?
1602+
// Yes, because the above query does not filter on `t`.
1603+
// Therefore we must evaluate it for any update on `t`.
1604+
let table_update = DatabaseTableUpdate {
1605+
table_id: t_id,
1606+
table_name: "t".into(),
1607+
inserts: [product![0u8, 0u8]].into(),
1608+
deletes: [].into(),
1609+
};
1610+
1611+
let hashes = subscriptions
1612+
.queries_for_table_update(&table_update)
1613+
.cloned()
1614+
.collect::<Vec<_>>();
1615+
1616+
assert_eq!(hashes, vec![hash]);
1617+
1618+
// Do we need to evaluate the above join query for this table update?
1619+
// Yes, because `s.a = 1`.
1620+
let table_update = DatabaseTableUpdate {
1621+
table_id: s_id,
1622+
table_name: "s".into(),
1623+
inserts: [product![0u8, 1u8]].into(),
1624+
deletes: [].into(),
1625+
};
1626+
1627+
let hashes = subscriptions
1628+
.queries_for_table_update(&table_update)
1629+
.cloned()
1630+
.collect::<Vec<_>>();
1631+
1632+
assert_eq!(hashes, vec![hash]);
1633+
1634+
// Do we need to evaluate the above join query for this table update?
1635+
// No, because `s.a != 1`.
1636+
let table_update = DatabaseTableUpdate {
1637+
table_id: s_id,
1638+
table_name: "s".into(),
1639+
inserts: [product![0u8, 2u8]].into(),
1640+
deletes: [].into(),
1641+
};
1642+
1643+
let hashes = subscriptions
1644+
.queries_for_table_update(&table_update)
1645+
.cloned()
1646+
.collect::<Vec<_>>();
1647+
1648+
assert!(hashes.is_empty());
1649+
1650+
Ok(())
1651+
}
1652+
15841653
#[test]
15851654
fn test_subscribe_fails_with_duplicate_request_id() -> ResultTest<()> {
15861655
let db = TestDB::durable()?;

0 commit comments

Comments
 (0)