Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ where
})
.map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
.map(|(sql, plan, table_id, table_name)| {
plan.physical_plan()
plan.optimized_physical_plan()
.clone()
.optimize()
.map(|plan| (sql, PipelinedProject::from(plan)))
Expand Down
264 changes: 260 additions & 4 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl ModuleSubscriptions {
tx,
|plan, tx| {
plan.plans_fragments()
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.physical_plan()))
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
},
auth,
Expand All @@ -212,7 +212,7 @@ impl ModuleSubscriptions {

let plans = query
.plans_fragments()
.map(|fragment| fragment.physical_plan())
.map(|fragment| fragment.optimized_physical_plan())
.cloned()
.map(|plan| plan.optimize())
.collect::<Result<Vec<_>, _>>()?
Expand Down Expand Up @@ -244,7 +244,7 @@ impl ModuleSubscriptions {
tx,
|plan, tx| {
plan.plans_fragments()
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.physical_plan()))
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
},
auth,
Expand Down Expand Up @@ -686,7 +686,7 @@ impl ModuleSubscriptions {
&tx,
|plan, tx| {
plan.plans_fragments()
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.physical_plan()))
.map(|plan_fragment| estimate_rows_scanned(tx, plan_fragment.optimized_physical_plan()))
.fold(0, |acc, rows_scanned| acc.saturating_add(rows_scanned))
},
&auth,
Expand Down Expand Up @@ -1933,6 +1933,262 @@ mod tests {
Ok(())
}

/// Test that we do not evaluate queries that we know will not match row updates
#[tokio::test]
async fn test_join_pruning() -> anyhow::Result<()> {
let (tx, mut rx) = client_connection(client_id_from_u8(1));

let db = relational_db()?;
let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

let u_id = db.create_table_for_test_with_the_works(
"u",
&[
("i", AlgebraicType::U64),
("a", AlgebraicType::U64),
("b", AlgebraicType::U64),
],
&[0.into()],
&[0.into()],
StAccess::Public,
)?;
let v_id = db.create_table_for_test_with_the_works(
"v",
&[
("i", AlgebraicType::U64),
("x", AlgebraicType::U64),
("y", AlgebraicType::U64),
],
&[0.into(), 1.into()],
&[0.into()],
StAccess::Public,
)?;

let schema = ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]);

commit_tx(
&db,
&subs,
[],
[
(v_id, product![1u64, 1u64, 1u64]),
(v_id, product![2u64, 2u64, 2u64]),
(v_id, product![3u64, 3u64, 3u64]),
(v_id, product![4u64, 4u64, 4u64]),
(v_id, product![5u64, 5u64, 5u64]),
],
)?;

let mut query_ids = 0;

subscribe_multi(
&subs,
&[
"select u.* from u join v on u.i = v.i where v.x = 1",
"select u.* from u join v on u.i = v.i where v.x = 2",
"select u.* from u join v on u.i = v.i where v.x = 3",
"select u.* from u join v on u.i = v.i where v.x = 4",
"select u.* from u join v on u.i = v.i where v.x = 5",
],
tx,
&mut query_ids,
)?;

assert_matches!(
rx.recv().await,
Some(SerializableMessage::Subscription(SubscriptionMessage {
result: SubscriptionResult::SubscribeMulti(_),
..
}))
);

// Insert a new row into `u` that joins with `x = 1`
let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 2u64, 3u64])])?;

assert_tx_update_for_table(&mut rx, u_id, &schema, [product![1u64, 2u64, 3u64]], []).await;

// We should only have evaluated a single query
assert_eq!(metrics.delta_queries_evaluated, 1);
assert_eq!(metrics.delta_queries_matched, 1);

// UPDATE v SET y = 2 WHERE id = 1
let metrics = commit_tx(
&db,
&subs,
[(v_id, product![1u64, 1u64, 1u64])],
[(v_id, product![1u64, 1u64, 2u64])],
)?;

// We should only have evaluated a single query
assert_eq!(metrics.delta_queries_evaluated, 1);
assert_eq!(metrics.delta_queries_matched, 0);

// UPDATE v SET x = 2 WHERE id = 1
let metrics = commit_tx(
&db,
&subs,
[(v_id, product![1u64, 1u64, 2u64])],
[(v_id, product![1u64, 2u64, 2u64])],
)?;

// Results in a no-op
assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;

// We should have evaluated queries for `x = 1` and `x = 2`
assert_eq!(metrics.delta_queries_evaluated, 2);
assert_eq!(metrics.delta_queries_matched, 2);

// Insert new row into `u` that joins with `x = 3`
// UPDATE v SET x = 4 WHERE id = 3
let metrics = commit_tx(
&db,
&subs,
[(v_id, product![3u64, 3u64, 3u64])],
[(v_id, product![3u64, 4u64, 3u64]), (u_id, product![3u64, 4u64, 5u64])],
)?;

assert_tx_update_for_table(&mut rx, u_id, &schema, [product![3u64, 4u64, 5u64]], []).await;

// We should have evaluated queries for `x = 3` and `x = 4`
assert_eq!(metrics.delta_queries_evaluated, 2);
assert_eq!(metrics.delta_queries_matched, 2);

// UPDATE v SET x = 0 WHERE id = 3
let metrics = commit_tx(
&db,
&subs,
[(v_id, product![3u64, 4u64, 3u64])],
[(v_id, product![3u64, 0u64, 3u64])],
)?;

assert_tx_update_for_table(&mut rx, u_id, &schema, [], [product![3u64, 4u64, 5u64]]).await;

// We should only have evaluated the query for `x = 4`
assert_eq!(metrics.delta_queries_evaluated, 1);
assert_eq!(metrics.delta_queries_matched, 1);

// Insert new row into `u` that joins with `x = 5`
// UPDATE v SET x = 6 WHERE id = 5
let metrics = commit_tx(
&db,
&subs,
[(v_id, product![5u64, 5u64, 5u64])],
[(v_id, product![5u64, 6u64, 6u64]), (u_id, product![5u64, 6u64, 7u64])],
)?;

// Results in a no-op
assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await;

// We should only have evaluated the query for `x = 5`
assert_eq!(metrics.delta_queries_evaluated, 1);
assert_eq!(metrics.delta_queries_matched, 1);

Ok(())
}

/// Test that one client unsubscribing does not affect another
#[tokio::test]
async fn test_unsubscribe() -> anyhow::Result<()> {
// Establish a connection for each client
let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1));
let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2));

let db = relational_db()?;
let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone());

let u_id = db.create_table_for_test(
"u",
&[
("i", AlgebraicType::U64),
("a", AlgebraicType::U64),
("b", AlgebraicType::U64),
],
&[0.into()],
)?;
let v_id = db.create_table_for_test(
"v",
&[
("i", AlgebraicType::U64),
("x", AlgebraicType::U64),
("y", AlgebraicType::U64),
],
&[0.into(), 1.into()],
)?;

commit_tx(&db, &subs, [], [(v_id, product![1u64, 1u64, 1u64])])?;

let mut query_ids = 0;

subscribe_multi(
&subs,
&["select u.* from u join v on u.i = v.i where v.x = 1"],
tx_for_a,
&mut query_ids,
)?;
subscribe_multi(
&subs,
&["select u.* from u join v on u.i = v.i where v.x = 1"],
tx_for_b.clone(),
&mut query_ids,
)?;

// Wait for both subscriptions
assert_matches!(
rx_for_a.recv().await,
Some(SerializableMessage::Subscription(SubscriptionMessage {
result: SubscriptionResult::SubscribeMulti(_),
..
}))
);
assert_matches!(
rx_for_b.recv().await,
Some(SerializableMessage::Subscription(SubscriptionMessage {
result: SubscriptionResult::SubscribeMulti(_),
..
}))
);

unsubscribe_multi(&subs, tx_for_b, query_ids)?;

assert_matches!(
rx_for_b.recv().await,
Some(SerializableMessage::Subscription(SubscriptionMessage {
result: SubscriptionResult::UnsubscribeMulti(_),
..
}))
);

// Insert a new row into `u`
let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?;

assert_tx_update_for_table(
&mut rx_for_a,
u_id,
&ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]),
[product![1u64, 0u64, 0u64]],
[],
)
.await;

// We should only have evaluated a single query
assert_eq!(metrics.delta_queries_evaluated, 1);
assert_eq!(metrics.delta_queries_matched, 1);

// Modify a matching row in `v`
let metrics = commit_tx(
&db,
&subs,
[(v_id, product![1u64, 1u64, 1u64])],
[(v_id, product![1u64, 2u64, 2u64])],
)?;

// We should only have evaluated a single query
assert_eq!(metrics.delta_queries_evaluated, 1);
assert_eq!(metrics.delta_queries_matched, 1);

Ok(())
}

/// Test that we do not evaluate queries that return trivially empty results
///
/// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel.
Expand Down
Loading
Loading